我正在尝试使用IdempotentReceiverInterceptor来防止我的集成流发出它已经产生的消息。但是,似乎IdempotentReceiverInterceptor#invoke只想要扩展MessageHandler(某种程度上)或具有第一个参数是Message的名为“ handleMessage”的方法的组件。我想对消息进行重复数据删除,如果它失败了,请执行一些其他处理。
我的问题有两个:
有什么理由不应该在任何情况下使用此建议
零件;变压器,GenericHandler等?
因为它只允许在MessageHandler或具有handleMessage方法的任何对象上运行,所以我尝试使用AbstractReplyProducingMessageHandler(因为我需要执行其他处理),但这也不起作用。
我觉得我误解了IdempotentReceiverInterceptor的正确用法,我应该能够在几乎所有组件上使用它,但是实现似乎与我不同意。我应该如何使用它不正确还是我使用不正确?我知道我可能只是将MetadataStoreSelector作为过滤器放入流程中,但是我正在尝试按照SI的建议进行操作。
任何帮助表示赞赏。这是我期望可以使用的示例流程。
return IntegrationFlows
.from(messageProducer)
.<String, UUID>transform(s -> UUID.fromString(s))
.claimCheckOut(messageStore)
.handle(
new AbstractReplyProducingMessageHandler() {
@Override
protected Object handleRequestMessage(final Message<?> requestMessage) {
return requestMessage;
}
},
spec -> {
spec.advice(idempotentReceiverInterceptor);
})
// do some more stuff
.transform(transformer)
.handle(loggingHandler)
.get();
这是日志消息,表明它没有
This advice org.springframework.integration.handler.advice.IdempotentReceiverInterceptor can only be used for MessageHandlers; an attempt to advise method 'toString' in 'org.springframework.integration.handler.AbstractReplyProducingMessageHandler$AdvisedRequestHandler' is ignored
编辑:对于那些来这里寻找解决方案的人,这几乎是我最终要做的
return IntegrationFlows
.from(messageProducer)
.<String, UUID>transform(s -> UUID.fromString(s))
.claimCheckOut(messageStore)
.filter(metadataStoreSelector)
// do some more stuff
.transform(transformer)
.handle(loggingHandler)
.get();
最佳答案
根据Reference Manual:
这是一个AOP忠告,它应用于MessageHandler.handleMessage()
方法,并且可以根据其配置过滤请求消息或将其标记为重复项。
源代码说:
boolean isMessageHandler = invocationThis != null && invocationThis instanceof MessageHandler;
boolean isMessageMethod = method.getName().equals("handleMessage")
&& (arguments.length == 1 && arguments[0] instanceof Message);
换句话说,
MessageHandler
实现只能受IdempotentReceiverInterceptor
影响。那是一个
您不用担心
Transformer
,GenericHandler
。框架最终以特定的MessageHandler
包装结束,例如ServiceActivatingHandler
,MessageTransformingHandler
等。所有这些仅仅是因为它就像Spring Integration中的合同:channel -> endpoint -> messageHandler
。那是两个。
由于
IdempotentReceiverInterceptor
的主要前提是将逻辑准确地应用于使用者端点(请参见其文档),因此您不能在spec.advice()
中使用它,因为正如您所期望的那样,它恰好适用于handleRequestMessage()
。但这对于IdempotentReceiverInterceptor
不起作用。是的...不幸的是,没有通过Java DSL样式使用它的简单方法。尽管您可以通过消息通道区分流,并将
@ServiceActivator
(@Transformer
)与@IdempotentReceiver
一起提取到单独的@Service
中,但是直到我们找出“原始” Java配置的解决方案为止。随便举起JIRA票。我认为
ConsumerEndpointFactoryBean
应该将IdempotentReceiverInterceptor
与advice
中的所有其他字符区分开,并最终将其正确地应用于handleMessage
。因此,您期望的配置将起作用。感谢您发现这一点!