本文介绍了Spring Integration jdbc:inbound-channel-adapter - 将 max-rows-per-poll 动态设置为节流的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有一个 JDBC:inbound-channel-adapter:设置max-rows-per-poll"动态以限制在通道上传递的消息.

I have a JDBC:inbound-channel-adapter : To set the 'max-rows-per-poll' dynamic to throttle the messages getting passed on the channel.

我有一个容量为 200 的 QueueChannel.入站通道适配器会将消息发送到此 QueueChannel.我想根据 QueueChannel 的 RemainingCapacity 设置max-rows-per-poll"值.

I have a QueueChannel which has a capacity of 200. The inbound-channel-adapter would be sending the message to this QueueChannel. I would like to set the 'max-rows-per-poll' value depending on the RemainingCapacity of the QueueChannel.

为此,我尝试在 Bean 中注入 QueueChannel,但在部署 war 文件时出现错误.

For this I tried to Inject the QueueChannel in a Bean but I get the error when deploying the war file.

错误:由于 StateConversionError,无法注入 QueueChannel.

Error: Cannot Inject the QueueChannel due to StateConversionError.

有没有其他方法可以实现这一点.

Is there any other way I could achieve this.

更新:我使用的是 Spring-Integration-2.2.0.RC2

Update : I am using Spring-Integration-2.2.0.RC2

这是 jdbc-inbound-adapter 的配置:

This is the config for jdbc-inbound-adapter:

<si-jdbc:inbound-channel-adapter id ="jdbcInboundAdapter" channel="queueChannel" data-source="myDataSource" auto-startup="true" query="${select.query}"
update="${update.query}" max-rows-per-poll="100"  row-mapper="rowMapper" update-per-row="true">
 <si:poller fixed-rate="5000">
    <si:transactional/>
     <si:advice-chain>
           <bean class="foo.bar.InboundAdapterPollingConfigurationImpl"/>
     </si:advice-chain>
  </si:poller>
</si-jdbc:inbound-channel-adapter>

豆子:

    @Service
public class InboundAdapterPollingConfigurationImpl implements InboundAdapterPollingConfiguration{

    private static final Logger logger = LoggerFactory.getLogger(InboundAdapterPollingConfigurationImpl.class);

    @Autowired
    QueueChannel queueChannel;
    @Autowired
    SourcePollingChannelAdapter jdbcInboundAdapter;

    public void setJdbcInboundAdapterMaxRowsPerPoll(){
        String size = String.valueOf(queueChannel.getRemainingCapacity());
        DirectFieldAccessor directFieldAccessor = new DirectFieldAccessor(jdbcInboundAdapter);      
        directFieldAccessor.setPropertyValue("setMaxRowsPerPoll", size);        
        String maxRowsPerPollSize = (String)directFieldAccessor.getPropertyValue("setMaxRowsPerPoll");
        System.out.println(maxRowsPerPollSize);
    }
}

问题是如何从通知链调用 InboundAdapterPollingConfigurationImpl.setJdbcInboundAdapterMaxRowsPerPoll() 方法.很抱歉这个幼稚的问题,但这是我第一次使用建议链.我也在寻找一个例子,但还不够幸运.

The question is how to call the InboundAdapterPollingConfigurationImpl.setJdbcInboundAdapterMaxRowsPerPoll() method from the advice chain. Sorry for the naive question but t is my first time using the advice-chain. Also I am searching for an example but was not lucky yet.

更新 2:执行此操作时出现以下错误:

Update2:Got the below error when this is executed:

JdbcPollingChannelAdapter source = (JdbcPollingChannelAdapter)dfa.getPropertyValue("source"); 

错误:

 java.lang.ClassCastException: $Proxy547 cannot be cast to org.springframework.integration.jdbc.JdbcPollingChannelAdapter – 

我有 JDK1.6_26.我在其中一篇文章中读到过这种情况发生在 JDK1.6 的早期版本中.

I have the JDK1.6_26. I read in one of the posts that this is happening in the early versions of JDK1.6.

推荐答案

好吧,让我们试着调查一下!

Well, let's try to investigate!

  1. max-rows-per-poll 是具有适当设置器的 JdbcPollingChannelAdapter 的易失性属性.
  2. 至于 JdbcPollingChannelAdapter 只是在 TaskScheduler.schedule() 的倡议下在其 receive() 方法中执行的东西看起来像改变该属性在运行时是安全的.这是我们任务的第一点
  3. QueueChannel 具有属性 getQueueSize().至于 capacity 是您的配置选项,因此您可以简单地计算 max-rows-per-poll
  4. 的值
  5. 现在如何让它发挥作用?实际上,您只对每次轮询中 max-rows-per-poll 的值感兴趣.所以,我们应该以某种方式介入轮询器或轮询任务.好吧,advice-chain 子元素,我们可以写一些 Advice,它应该改变 JdbcPollingChannelAdapter#setMaxRowsPerPoll 在调用 receive() 之前,值应该基于 QueueChannel#getQueueSize()
  6. 将您的 QueueChannel 注入到您的 Advice 的 bean 中
  7. 现在有些不好的地方:如何注入 JdbcPollingChannelAdapter bean?仅从 Spring Integration 3.0 开始,我们提供了一个钩子来将 MessageSources 注册为 bean.从这里开始编写这段代码就足够了:

  1. max-rows-per-poll is a volatile property of JdbcPollingChannelAdapter with appropriate setter.
  2. As far as JdbcPollingChannelAdapter does the stuff within its receive() method just on the initiative of TaskScheduler.schedule() looks like changing that property at runtime is safe. It is a first point for our task
  3. QueueChannel has property getQueueSize(). As far as a capacity is your configuration option, so you can simply calculate a value for max-rows-per-poll
  4. And now how to get it worked? Actually you are interested in the value for max-rows-per-poll just on each poll. So, we should somehow to wedge into poller or polling task. Well, <poller> has advice-chain sub-element and we can write some Advice, which should change JdbcPollingChannelAdapter#setMaxRowsPerPoll before invoking receive() and the value should be based on QueueChannel#getQueueSize()
  5. Inject your QueueChannel to the bean of your Advice
  6. And now some bad point: how to inject JdbcPollingChannelAdapter bean? We provide a hook to register MessageSources as beans just only since Spring Integration 3.0. From here it's just enough to write this code:

@Autowired@Qualifier("jdbcAdapter.source")私有 JdbcPollingChannelAdapter messageSource;

@Autowired@Qualifier("jdbcAdapter.source")private JdbcPollingChannelAdapter messageSource;

我们将在本周发布 3.0.GA.因此,在 Sring Integration 3.0 之前,让我不要考虑反射森林".但是,您可以在注入的 SourcePollingChannelAdapter bean 上使用 DirectFieldAccessor 来做到这一点.

We are going to release 3.0.GA this week. So, let me do not consider the reflection 'forest' prior to Sring Integration 3.0. However you can do it using DirectFieldAccessor on injected SourcePollingChannelAdapter bean.

更新

您的建议可能如下所示:

public class MyAdvice implements MethodInterceptor {
       @Autowired
       QueueChannel queueChannel;

       @Autowired
       SourcePollingChannelAdapter jdbcInboundAdapter; 

      Object invoke(MethodInvocation invocation) throws Throwable {
            DirectFieldAccessor dfa = new DirectFieldAccessor(jdbcInboundAdapter);
            JdbcPollingChannelAdapter source = (JdbcPollingChannelAdapter) dfa.getPropertyValue("source");
            source.setMaxRowsPerPoll(queueChannel.getRemainingCapacity());
            return invocation.proceed();
      }
}

理论在这里:http://docs.spring.io/spring/docs/3.2.5.RELEASE/spring-framework-reference/htmlsingle/#aop

这篇关于Spring Integration jdbc:inbound-channel-adapter - 将 max-rows-per-poll 动态设置为节流的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!

09-18 08:49