本文介绍了当代理不可用时,消息不会出现在Spring Integration(Kafka)ErrorChannel中的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在使用 Spring Integration 处理一个基于 Kafka 的简单项目,我们要求在Broker关闭时,消息将传递到 ErrorChannel strong>,我们可以将它们/保存为死信"等.

I am working with a simple Kafka based project using Spring Integration and we require that when the Broker is down, messages will pass into the ErrorChannel and we can deal with them /save as 'dead-letters' etc.

我们得到的是无数种异常:

What we are getting is a countless run of Exceptions:

2017-09-19 17:14:19.651 DEBUG 12171 --- [ad | producer-1] o.apache.kafka.common.network.Selector   : Connection with localhost/127.0.0.1 disconnected

java.net.ConnectException: Connection refused
    at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method) ~[na:1.8.0_131]
    at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:717) ~[na:1.8.0_131]

但是错误通道引用了 :-/

But the error channel is not referenced :-/

我尝试将其连接起来,但无济于事-这是我的应用上下文的 part :

I have tried to hook it up, but to no avail - here is part of my app-context:

<bean id="channelExecutor" class="org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor">
    <property name="corePoolSize" value="1"/>
    <property name="maxPoolSize" value="10"/>
    <property name="queueCapacity" value="1000"/>
</bean>

<int:channel id="producingChannel" >
    <int:dispatcher task-executor="channelExecutor" />
</int:channel>

<int-kafka:outbound-channel-adapter id="kafkaOutboundChannelAdapter"
                                    kafka-template="kafkaTemplate"
                                    auto-startup="true"
                                    channel="producingChannel"
                                    topic="${kafka.topic}">
</int-kafka:outbound-channel-adapter>

<int:service-activator input-channel="errorChannel" ref="errorLogger" method="logError" />
<bean id="errorLogger" class="uk.co.sainsburys.integration.service.ErrorLogger" />

<bean id="kafkaTemplate" class="org.springframework.kafka.core.KafkaTemplate">
    <constructor-arg ref="producerConfigs"/> <!-- producerConfigs piece is NOT included! -->
</bean> 

可悲的是,我不是Spring Integration的专家-有什么想法我做错了吗?

Sadly, I am not an expert at Spring Integration - any ideas what I am doing wrong?

感谢您的帮助.

推荐答案

到目前为止,所有内容都是正确的.默认情况下,您缺少 KafkaProducerMessageHandler 中的 async 行为的事实的问题:

Everything is correct so far. The problem that you are missing the fact of async behavior by default in the KafkaProducerMessageHandler:

/**
 * A {@code boolean} indicating if the {@link KafkaProducerMessageHandler}
 * should wait for the send operation results or not. Defaults to {@code false}.
 * In {@code sync} mode a downstream send operation exception will be re-thrown.
 * @param sync the send mode; async by default.
 * @since 2.0.1
 */
public void setSync(boolean sync) {

因此,请考虑在< int-kafka:outbound-channel-adapter> 上使用 sync ="true" 属性.

So, consider to use sync="true" attribute on the <int-kafka:outbound-channel-adapter>.

此外,我们推出了最新的最新版本:

In addition, with the latest upcoming versions we have introduced:

  <xsd:attribute name="send-failure-channel" type="xsd:string">
            <xsd:annotation>
                <xsd:documentation><![CDATA[
                    Specifies the channel to which an ErrorMessage for a failed send will be sent.
                ]]></xsd:documentation>
                <xsd:appinfo>
                    <tool:annotation kind="ref">
                        <tool:expected-type type="org.springframework.messaging.MessageChannel" />
                    </tool:annotation>
                </xsd:appinfo>
            </xsd:annotation>
        </xsd:attribute>

对于 async 行为捕获这些错误很有用.

which is useful for the async behavior to catch those errors.

这篇关于当代理不可用时,消息不会出现在Spring Integration(Kafka)ErrorChannel中的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!

10-12 17:52