问题描述
我有一个 JDBC:inbound-channel-adapter:设置max-rows-per-poll"动态以限制在通道上传递的消息.
I have a JDBC:inbound-channel-adapter : To set the 'max-rows-per-poll' dynamic to throttle the messages getting passed on the channel.
我有一个容量为 200 的 QueueChannel.入站通道适配器会将消息发送到此 QueueChannel.我想根据 QueueChannel 的 RemainingCapacity 设置max-rows-per-poll"值.
I have a QueueChannel which has a capacity of 200. The inbound-channel-adapter would be sending the message to this QueueChannel. I would like to set the 'max-rows-per-poll' value depending on the RemainingCapacity of the QueueChannel.
为此,我尝试在 Bean 中注入 QueueChannel,但在部署 war 文件时出现错误.
For this I tried to Inject the QueueChannel in a Bean but I get the error when deploying the war file.
错误:由于 StateConversionError,无法注入 QueueChannel.
Error: Cannot Inject the QueueChannel due to StateConversionError.
有没有其他方法可以实现这一点.
Is there any other way I could achieve this.
更新:我使用的是 Spring-Integration-2.2.0.RC2
Update : I am using Spring-Integration-2.2.0.RC2
这是 jdbc-inbound-adapter 的配置:
This is the config for jdbc-inbound-adapter:
<si-jdbc:inbound-channel-adapter id ="jdbcInboundAdapter" channel="queueChannel" data-source="myDataSource" auto-startup="true" query="${select.query}"
update="${update.query}" max-rows-per-poll="100" row-mapper="rowMapper" update-per-row="true">
<si:poller fixed-rate="5000">
<si:transactional/>
<si:advice-chain>
<bean class="foo.bar.InboundAdapterPollingConfigurationImpl"/>
</si:advice-chain>
</si:poller>
</si-jdbc:inbound-channel-adapter>
豆子:
@Service
public class InboundAdapterPollingConfigurationImpl implements InboundAdapterPollingConfiguration{
private static final Logger logger = LoggerFactory.getLogger(InboundAdapterPollingConfigurationImpl.class);
@Autowired
QueueChannel queueChannel;
@Autowired
SourcePollingChannelAdapter jdbcInboundAdapter;
public void setJdbcInboundAdapterMaxRowsPerPoll(){
String size = String.valueOf(queueChannel.getRemainingCapacity());
DirectFieldAccessor directFieldAccessor = new DirectFieldAccessor(jdbcInboundAdapter);
directFieldAccessor.setPropertyValue("setMaxRowsPerPoll", size);
String maxRowsPerPollSize = (String)directFieldAccessor.getPropertyValue("setMaxRowsPerPoll");
System.out.println(maxRowsPerPollSize);
}
}
问题是如何从通知链调用 InboundAdapterPollingConfigurationImpl.setJdbcInboundAdapterMaxRowsPerPoll() 方法.很抱歉这个幼稚的问题,但这是我第一次使用建议链.我也在寻找一个例子,但还不够幸运.
The question is how to call the InboundAdapterPollingConfigurationImpl.setJdbcInboundAdapterMaxRowsPerPoll() method from the advice chain. Sorry for the naive question but t is my first time using the advice-chain. Also I am searching for an example but was not lucky yet.
更新 2:执行此操作时出现以下错误:
Update2:Got the below error when this is executed:
JdbcPollingChannelAdapter source = (JdbcPollingChannelAdapter)dfa.getPropertyValue("source");
错误:
java.lang.ClassCastException: $Proxy547 cannot be cast to org.springframework.integration.jdbc.JdbcPollingChannelAdapter –
我有 JDK1.6_26.我在其中一篇文章中读到过这种情况发生在 JDK1.6 的早期版本中.
I have the JDK1.6_26. I read in one of the posts that this is happening in the early versions of JDK1.6.
推荐答案
好吧,让我们试着调查一下!
Well, let's try to investigate!
max-rows-per-poll
是具有适当设置器的JdbcPollingChannelAdapter
的易失性属性.- 至于
JdbcPollingChannelAdapter
只是在TaskScheduler.schedule()
的倡议下在其receive()
方法中执行的东西看起来像改变该属性在运行时是安全的.这是我们任务的第一点 QueueChannel
具有属性getQueueSize()
.至于capacity
是您的配置选项,因此您可以简单地计算max-rows-per-poll
的值- 现在如何让它发挥作用?实际上,您只对每次轮询中
max-rows-per-poll
的值感兴趣.所以,我们应该以某种方式介入轮询器或轮询任务.好吧,有
advice-chain
子元素,我们可以写一些Advice
,它应该改变JdbcPollingChannelAdapter#setMaxRowsPerPoll
在调用receive()
之前,值应该基于QueueChannel#getQueueSize()
- 将您的
QueueChannel
注入到您的Advice
的 bean 中 现在有些不好的地方:如何注入
JdbcPollingChannelAdapter
bean?仅从 Spring Integration 3.0 开始,我们提供了一个钩子来将MessageSources
注册为 bean.从这里开始编写这段代码就足够了:
max-rows-per-poll
is a volatile property ofJdbcPollingChannelAdapter
with appropriate setter.- As far as
JdbcPollingChannelAdapter
does the stuff within itsreceive()
method just on the initiative ofTaskScheduler.schedule()
looks like changing that property at runtime is safe. It is a first point for our task QueueChannel
has propertygetQueueSize()
. As far as acapacity
is your configuration option, so you can simply calculate a value formax-rows-per-poll
- And now how to get it worked? Actually you are interested in the value for
max-rows-per-poll
just on each poll. So, we should somehow to wedge into poller or polling task. Well,<poller>
hasadvice-chain
sub-element and we can write someAdvice
, which should changeJdbcPollingChannelAdapter#setMaxRowsPerPoll
before invokingreceive()
and the value should be based onQueueChannel#getQueueSize()
- Inject your
QueueChannel
to the bean of yourAdvice
And now some bad point: how to inject
JdbcPollingChannelAdapter
bean? We provide a hook to registerMessageSources
as beans just only since Spring Integration 3.0. From here it's just enough to write this code:
@Autowired@Qualifier("jdbcAdapter.source")私有 JdbcPollingChannelAdapter messageSource;
@Autowired@Qualifier("jdbcAdapter.source")private JdbcPollingChannelAdapter messageSource;
我们将在本周发布 3.0.GA.因此,在 Sring Integration 3.0 之前,让我不要考虑反射森林".但是,您可以在注入的 SourcePollingChannelAdapter
bean 上使用 DirectFieldAccessor
来做到这一点.
We are going to release 3.0.GA this week. So, let me do not consider the reflection 'forest' prior to Sring Integration 3.0. However you can do it using DirectFieldAccessor
on injected SourcePollingChannelAdapter
bean.
更新
您的建议
可能如下所示:
public class MyAdvice implements MethodInterceptor {
@Autowired
QueueChannel queueChannel;
@Autowired
SourcePollingChannelAdapter jdbcInboundAdapter;
Object invoke(MethodInvocation invocation) throws Throwable {
DirectFieldAccessor dfa = new DirectFieldAccessor(jdbcInboundAdapter);
JdbcPollingChannelAdapter source = (JdbcPollingChannelAdapter) dfa.getPropertyValue("source");
source.setMaxRowsPerPoll(queueChannel.getRemainingCapacity());
return invocation.proceed();
}
}
理论在这里:http://docs.spring.io/spring/docs/3.2.5.RELEASE/spring-framework-reference/htmlsingle/#aop
这篇关于Spring Integration jdbc:inbound-channel-adapter - 将 max-rows-per-poll 动态设置为节流的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!