我正在尝试使用IdempotentReceiverInterceptor来防止我的集成流发出它已经产生的消息。但是,似乎IdempotentReceiverInterceptor#invoke只想要扩展MessageHandler(某种程度上)或具有第一个参数是Message的名为“ handleMessage”的方法的组件。我想对消息进行重复数据删除,如果它失败了,请执行一些其他处理。

我的问题有两个:


有什么理由不应该在任何情况下使用此建议
零件;变压器,GenericHandler等?
因为它只允许在MessageHandler或具有handleMessage方法的任何对象上运行,所以我尝试使用AbstractReplyProducingMessageHandler(因为我需要执行其他处理),但这也不起作用。


我觉得我误解了IdempotentReceiverInterceptor的正确用法,我应该能够在几乎所有组件上使用它,但是实现似乎与我不同意。我应该如何使用它不正确还是我使用不正确?我知道我可能只是将MetadataStoreSelector作为过滤器放入流程中,但是我正在尝试按照SI的建议进行操作。

任何帮助表示赞赏。这是我期望可以使用的示例流程。

return IntegrationFlows
    .from(messageProducer)
    .<String, UUID>transform(s -> UUID.fromString(s))
    .claimCheckOut(messageStore)
    .handle(
        new AbstractReplyProducingMessageHandler() {

          @Override
          protected Object handleRequestMessage(final Message<?> requestMessage) {
            return requestMessage;
          }

        },
        spec -> {
          spec.advice(idempotentReceiverInterceptor);
        })
    // do some more stuff
    .transform(transformer)
    .handle(loggingHandler)
    .get();


这是日志消息,表明它没有

This advice org.springframework.integration.handler.advice.IdempotentReceiverInterceptor can only be used for MessageHandlers; an attempt to advise method 'toString' in 'org.springframework.integration.handler.AbstractReplyProducingMessageHandler$AdvisedRequestHandler' is ignored


编辑:对于那些来这里寻找解决方案的人,这几乎是我最终要做的

return IntegrationFlows
    .from(messageProducer)
    .<String, UUID>transform(s -> UUID.fromString(s))
    .claimCheckOut(messageStore)
    .filter(metadataStoreSelector)
    // do some more stuff
    .transform(transformer)
    .handle(loggingHandler)
    .get();

最佳答案

根据Reference Manual


  这是一个AOP忠告,它应用于MessageHandler.handleMessage()方法,并且可以根据其配置过滤请求消息或将其标记为重复项。


源代码说:

boolean isMessageHandler = invocationThis != null && invocationThis instanceof MessageHandler;
boolean isMessageMethod = method.getName().equals("handleMessage")
        && (arguments.length == 1 && arguments[0] instanceof Message);


换句话说,MessageHandler实现只能受IdempotentReceiverInterceptor影响。

那是一个

您不用担心TransformerGenericHandler。框架最终以特定的MessageHandler包装结束,例如ServiceActivatingHandlerMessageTransformingHandler等。所有这些仅仅是因为它就像Spring Integration中的合同:channel -> endpoint -> messageHandler

那是两个。

由于IdempotentReceiverInterceptor的主要前提是将逻辑准确地应用于使用者端点(请参见其文档),因此您不能在spec.advice()中使用它,因为正如您所期望的那样,它恰好适用于handleRequestMessage()。但这对于IdempotentReceiverInterceptor不起作用。

是的...不幸的是,没有通过Java DSL样式使用它的简单方法。尽管您可以通过消息通道区分流,并将@ServiceActivator@Transformer)与@IdempotentReceiver一起提取到单独的@Service中,但是直到我们找出“原始” Java配置的解决方案为止。

随便举起JIRA票。我认为ConsumerEndpointFactoryBean应该将IdempotentReceiverInterceptoradvice中的所有其他字符区分开,并最终将其正确地应用于handleMessage。因此,您期望的配置将起作用。

感谢您发现这一点!

10-06 10:42