本文介绍了使用 CoD over Camel JMS 组件实现原生 websphere MQ的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我在使用 Apache CAMEL 实现 Websphere MQ (WMQ) 连接器时遇到了很多困难,该连接器可以毫无例外地处理 MQ 交付确认 (CoD) 报告,不会产生不需要的响应数据报形式的副作用.最后,我让它按照我想要的方式工作,如果您习惯于编写本机 MQ 客户端,这是一种非常标准和常见的方式.我在同一主题的帖子中记录了该方法,但我发现该解决方案因复杂而臃肿,非常感谢任何建议或示例,以使实现更简洁、更优雅.

I had much difficulties implementing a Websphere MQ (WMQ) connector with Apache CAMEL that could handle MQ Confirmation of Delivery (CoD) reports without exceptions, neither side effects in the form of unwanted response datagrams. In the end I got it working the way I wanted, a much standard and common way if you are accustomed to writing native MQ clients. I document the method in a post to this same topic, but I find the solution bloated with intricacies and would very much appreciate any advice or example to make the implementation cleaner and more elegant.

我明白,问题的根源在于 MQ 设计请求-回复消息交换模式 (MEP) 的方式、JMS 规范的方式以及 JMS 组件中请求-回复 MEP 的 CAMEL 实现.三种不同的理念!

I understood that the problem takes its roots into the way MQ designed request-reply Message Exchange Pattern (MEP), versus the way JMS specifications did, versus CAMEL implementation of the Request-Reply MEP in its JMS Component. Three different philosophies!

  1. WMQ 具有 MessageType 标头(请参阅 MQMD 字段常量),请求值为 1,回复值为 2,数据报值为 8(单向 MEP).此外,值 4 用于标记 CoD(Conf. of Delivery)、PAN(Positive AckNowledge)和 NAN(Negative AckNowledge)形式的 Report 消息,这在消息流方面也进行了额外的 Reply信息.可以使用名为报告"的另一个标题字段为请求消息、回复或数据报请求 CoD、PAN 和 NAN 确认,其中可以组合所有报告变体的标志.附加标头字段ReplyToQ"和ReplyToQMgr"指定原始发件人期望报告和答复所在的队列和队列管理器,固定的 24 个字节CorrelId"字段 - 可选 - 可以帮助关联报告和回复原始数据报或请求消息.为了使它更复杂,确实可以用相同的原始消息 ID 和没有 CorrelID 发送回回复和报告,或者在 CorrelId 中提供原始消息 ID,或者在原始请求或数据报中已经指定时返回 CorrelId 值.IBM 提供了一个 WMQ 上的 JMS API,允许通过 WMQ 将普通 JMS 交换作为传输隧道传输(借助额外的消息头名称 MQRFH2),或将原生 MQ 消息映射到 JMS 消息,反之亦然.立>
  2. 另一方面,JMS 规范提供了一个可选的JMSReplyTo"标头字段和一个JMSCorrelationID",但将确切的 MEP 语义留给客户端应用程序;即在规范中说明:响应可能是可选的;由客户决定."
  3. CAMEL 具有 XML 或 Java DSL 中的路由"和内部交换对象模型,旨在支持 EIP 模式,其中 Request-Reply 模式.然后 CAMEL 在其 JMS 组件 中假设,如果设置了 JMSReplyTo 字段,则这必然是一个请求期望回复,导致 Exchange 的 Out 部分(如果 Out 为空,则修改 In 部分)返回到 JMSReplyTo 中定义的队列.
  1. WMQ features a MessageType header (see MQMD fields and constants) that bears value 1 for request, 2 for reply, and 8 for datagram (one way MEP). In addition, the value 4 is used to mark Report messages in the form of CoD (Conf. of Delivery), PAN (Positive AckNowledge) and NAN (Negative AckNowledge), which - in term of the message flow - also make an additional Reply message. The CoD, PAN, and NAN acks can be requested for either Request messages, Replies or Datagrams using another header field named 'Report' in which flags for all report variants can be combined. Additional header fields 'ReplyToQ' and 'ReplyToQMgr' specify the Queue and Queue Manager on which Reports and Replies are expected by the original sender, and a fixed 24 bytes 'CorrelId' field - optional - can help correlate Reports and Replies to the original Datagram or Request mesage. To make it more complex, one can indeed send back Replies and Reports with the same original Message ID and no CorrelID, or provide the original Message ID in the CorrelId, or return the CorrelId value when already specified in the original Request or Datagram. IBM provides a JMS API over WMQ, allowing either to tunnel plain JMS exchanges over WMQ as transport (with help from an extra message header name MQRFH2), or to map native MQ messages onto JMS messages and vice-versa.
  2. On the other hand, JMS specs provide an optional 'JMSReplyTo' header field and a 'JMSCorrelationID', but do leave the exact MEP semantics to client applications; namely stating in specs: "A response may be optional; it is up to the client to decide."
  3. CAMEL features 'routes' in XML or Java DSL and a inner Exchange Object model with the purpose to support EIP Patterns amongst which the Request-Reply pattern. CAMEL then assumes in its JMS Component that if the JMSReplyTo field is set, this is necessarily a Request expecting a Reply, causing the Out part (or modified In part if Out is empty) of an Exchange to be returned to the queue defined in JMSReplyTo.

我愿意通过远程 Websphere 队列管理器支持具有交付确认 (CoD) 报告的本地 MQ 消息交换,因此,除了事务和持久性(即无丢失、无重复)之外,还可以跟踪何时消息被消耗并在出现延迟时发出警报.

I was willing to support native MQ Message exchanges with Confirmation of Delivery (CoD) reports through a remote Websphere Queue Manager, so that, in addition to transaction and persistence (i.e. no loss, no duplicates) one can also track when a message is consumed and raise alerts in case of delays.

默认情况下,当来自队列的消息消耗完成时,Websphere 队列管理器会生成 CoD 报告.因此,没有任何特定设置,当 CAMEL 端点消耗时,远程 MQ 客户端发送带有 CoD 标志(然后是强制 ReplyToQ)的数据报,将从队列管理器返回作为 MQ 报告的第一个答复消息,然后是 CAMEL 显式返回的第二条(意外)回复消息,其中包含 CAMEL 路由末尾 Exchange 对象中剩余的任何内容,因为 CAMEL 假定存在请求-回复 EIP,因为存在 JMSReplyTo 字段(映射来自 MQ ReplyToQ 和 ReplyToQMgr,两者都需要支持 CoD 返回流).

By default, the Websphere Queue Manager generates CoD Reports when message consumption from the queue completes. Therefore, without any specific settings, a remote MQ client sending a datagram with the CoD flag (and the then compulsory ReplyToQ) will get a first reply as MQ Report back from the Queue Manager when the CAMEL endpoint consumes the message, followed by a second (unexpected) reply message explicitly returned by CAMEL and containing whatever is left in the Exchange object at the end of the CAMEL route, because CAMEL assumes a Request-Reply EIP given that the JMSReplyTo field is present (mapped from MQ ReplyToQ and ReplyToQMgr, both being required to support the CoD return flow).

如果没有特定设置,CAMEL 也默认假设出站连接上的请求-回复 EIP/MEP.CAMEL JMS/MQ 端点然后将等待 1 响应返回.当出站消息是 MQ 上的 JMS(因此带有 MQRFH2 标头)时,这可以正常工作.当强制使用普通 MQ 时,即删除如下 MQRFH2 标头时,我无法让端点侦听器匹配相关的传入 MQ 报告,尽管跟踪值似乎都是正确的(强制使用 24 个字符相关 ID,以便截断较长的 CorrelId 值或空填充by MQ 不能对相关过滤器进行地理分析).有没有人能够解决这个问题?

Without specific settings, CAMEL assumes too by default a Request-reply EIP / MEP on the outbound connection. The CAMEL JMS/MQ endpoint will then wait for 1 response back. When the OUTbound message is JMS over MQ (hence with MQRFH2 header), this works fine. When forcing plain vanilla MQ, i.e. removing MQRFH2 header as below, I was unable to get the endpoint listener match the correlated incoming MQ Report, although traced values seem all correct (enforcing 24 char correlation IDs so that truncation of longer CorrelId values or null padding by MQ cannot geopardize the correlation filter). Has anyone been able to solve that issue?

详细信息:虽然 IBM JMS API 接受传递特定的 JMS 属性值 WMQ_MESSAGE_BODY={1|0}/WMQ_TARGET_CLIENT={1|0} 来控制 JMS 标头 MQRFH2 在生成的消息中的存在,这些选项通过 CAMEL 变得无效.必须使用 CamelJmsDestinationName 标头(如 CAMEL JMS 文档中所述) 为目标提供一个 IBM 队列 URL,其中包含选项targetClient=1",以便去除 MQRFH2 标头.但是如果没有这个头,CoD 报告或 MQ 回复上的 CAMEL 关联就会失败.

Details: Although IBM JMS API accepts passing specific JMS property values WMQ_MESSAGE_BODY={1|0} / WMQ_TARGET_CLIENT={1|0} to control the presence of the JMS header MQRFH2 in the generated messages, these options become inoperative through CAMEL. One must use the CamelJmsDestinationName header (as explained in CAMEL JMS doc) to supply an IBM queue URL for the destination featuring the option "targetClient=1" in order to get rid of the MQRFH2 header. But without this header, the CAMEL correlation on the CoD Report or MQ reply does fail.

上述问题的解决方案确实是关于构建特定的 CAMEL 路由来处理远程方返回的 CoD 报告(以及相关的 MQ 回复).因此,CAMEL 出站消息必须强制为InOnly"ExchangePattern因此无需等待任何回复.但这会导致 CAMEL 抑制所有 ReplyTo 字段.然后,如果在出站 MQ 数据报上请求 MQ CoD,则会出现 CAMEL 异常,其原因是 MQException JMSCMQ0001: WebSphere MQ call failed with compcode '2' ('MQCC_FAILED') reason '2027' ('MQRC_MISSING_REPLY_TO_Q').

The solution to the above issue is indeed about building a specific CAMEL route to handle the CoD Reports returned by the remote party (as well as correlated MQ Replies). So CAMEL outbound messages must be enforced as "InOnly" ExchangePattern and thus not wait for any reply. But this will cause CAMEL to suppress too all ReplyTo fields. Then if a MQ CoD is requested on an outbound MQ datagram, a CAMEL exception ensues whose cause is MQException JMSCMQ0001: WebSphere MQ call failed with compcode '2' ('MQCC_FAILED') reason '2027' ('MQRC_MISSING_REPLY_TO_Q').

CAMEL 记录了一个 URI 选项 'disableReplyTo=true' 以禁用对交换模式的回复侦听,但保留 ReplyTo 字段 - 显然在入站和出站交换中.但是这个选项在出站 JMS 交换上不起作用(正如观察到的,我可能是错的),必须使用不太直观的preserveMessageQos"选项.

CAMEL documents an URI option 'disableReplyTo=true' to disable the reply-listening on exchange patterns yet keep the ReplyTo fields - apparently on both inbound and outbound exchanges. But this option does not work (as observed, I may be wrong) on outbound JMS exchanges and one must use the much less intuitive 'preserveMessageQos' option instead.

欢迎为这些问题提供优雅的解决方案.

Elegant solutions to these issues are welcome.

推荐答案

下面记录了我能得到的最好的东西,以 Spring XML 应用程序上下文为例,它本身承载着 CAMEL 上下文和路由.它与 IBM 本机 MQ JCA 兼容资源适配器 v7.5、CAMEL 2.15、Spring core 4.2 配合使用.我可以将它部署到 Glassfish 和 Weblogic 服务器.

The best I have been able to get is documented below, illustrated as a Spring XML application context that itself hosts the CAMEL context and routes. It works with the IBM native MQ JCA-compliant resource adapter v7.5, CAMEL 2.15, Spring core 4.2. I can deploy it to both Glassfish and Weblogic servers.

当然,考虑到众多变量,Java DSL 会在实际实现中使用.这个基于 CAMEL XML DSL 的示例是独立的且易于测试.

Of course a java DSL is used in a real implementation given the numerous variables. This example based on the CAMEL XML DSL is self-contained and easy to test.

我们从 Spring &骆驼声明:

We start with Spring & Camel declarations:

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xmlns:util="http://www.springframework.org/schema/util"
    xmlns:context="http://www.springframework.org/schema/context"
    xmlns:camel="http://camel.apache.org/schema/spring"
    xmlns:jee="http://www.springframework.org/schema/jee"
    xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
        http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd
        http://www.springframework.org/schema/util http://www.springframework.org/schema/util/spring-util.xsd
        http://camel.apache.org/schema/spring http://camel.apache.org/schema/spring/camel-spring.xsd
        http://www.springframework.org/schema/jee http://www.springframework.org/schema/jee/spring-jee-3.0.xsd">

CAMEL 上下文遵循 2 条路线:MQ 到 JMS 和 JMS 到 MQ,此处链接以形成桥梁以简化测试.

The CAMEL context follows with 2 routes: MQ to JMS and JMS to MQ, here chained to form a bridge to ease testing.

<camel:camelContext id="mqBridgeCtxt">

  <camel:route id="mq2jms" autoStartup="true">

奇怪:当使用本机 MQ 资源适配器时,获得(例如)3 个侦听器的唯一方法是强制执行 3 个连接(依次使用 3 个 Camel:from 语句),每个连接最多 1 个会话,否则会出现 MQ 错误:MQJCA1018:每个连接只允许一个会话.但是,如果您改用 MQ 客户端 jar,则 CAMEL JMS 中的 concurentConsumers 选项确实可以正常工作.

Weird: when using the native MQ Resource adapter the only way to get (e.g.) 3 listeners is to enforce 3 connections (with 3 Camel:from statements in sequence) with max 1 session each, otherwise an MQ error ensues: MQJCA1018: Only one session per connection is allowed. However, if you use MQ client jars instead, then the concurentConsumers option in CAMEL JMS does work fine.

    <camel:from uri="wmq:queue:TEST.Q1?concurrentConsumers=1&amp;disableReplyTo=true&amp;
            acknowledgementModeName=SESSION_TRANSACTED"/>
    <camel:from uri="wmq:queue:TEST.Q1?concurrentConsumers=1&amp;disableReplyTo=true&amp;
            acknowledgementModeName=SESSION_TRANSACTED"/>
    <camel:from uri="wmq:queue:TEST.Q1?concurrentConsumers=1&amp;disableReplyTo=true&amp;
            acknowledgementModeName=SESSION_TRANSACTED"/>

上面的 disable disableReplyTo 选项可确保 CAMEL 在我们测试 MQ 消息类型为 1=Request(-reply) 或 8=datagram(一种方式!)之前不会产生回复.此处未说明该测试和回复结构.

The disable disableReplyTo option above ensures that CAMEL will not produce a reply before we can test the MQ message type to be 1=Request(-reply) or 8=datagram (one way!). That test and reply construction is not illustrated here.

然后我们在下次发布到普通 JMS 时将 EIP 强制为 InOnly,以与 Inbound MQ 模式保持一致.

Then we enforce the EIP to InOnly on the next posting to plain JMS to be consistent with the Inbound MQ mode.

    <camel:setExchangePattern pattern="InOnly"/>
    <!-- camel:process ref="reference to your MQ message processing bean fits here" / -->
    <camel:to uri="ref:innerQueue" />
  </camel:route>

接下来是 jms-to-MQ 路由:

Next comes the jms-to-MQ route:

  <camel:route id="jms2mq"  autoStartup="true">
    <camel:from uri="ref:innerQueue" />
    <!-- remove inner message headers and properties to test without inbound side effects! -->
    <camel:removeHeaders pattern="*"/>
    <camel:removeProperties pattern="*" />
    <!-- camel:process ref="reference to your MQ message preparation bean fits here" / -->

现在是远程目标返回的 MQ CoD 报告的请求标志.我们还强制 MQ 消息为数据报类型(值 8).

Now comes the request flag for the MQ CoD report to be returned by remote destination. We also enforce the MQ message to be of Datagram type (value 8).

    <camel:setHeader headerName="JMS_IBM_Report_COD"><camel:simple resultType="java.lang.Integer">2048</camel:simple></camel:setHeader>
    <camel:setHeader headerName="JMS_IBM_Report_Pass_Correl_ID"><camel:simple resultType="java.lang.Integer">64</camel:simple></camel:setHeader>
    <camel:setHeader headerName="JMS_IBM_MsgType"><camel:simple resultType="java.lang.Integer">8</camel:simple></camel:setHeader>

可以通过 ReplyTo uri 选项指定 ReplyTo 队列,也可以作为标题指定,如下所示.

The ReplyTo queue can be specified either via the ReplyTo uri option, else as a header as below.

接下来我们确实使用 CamelJmsDestinationName 标头来强制抑制 JMS MQ 消息标头 MQRFH2(使用 targetClient MQ URL 选项值 1).换句话说,我们想要发送一个普通的 MQ 二进制消息(即,只有 MQMD 消息描述符后跟有效负载).

Next we do use CamelJmsDestinationName header to enforce suppressing of the JMS MQ message header MQRFH2 (using targetClient MQ URL option value 1). In other words, we want to send a plain vanilla MQ binary message (i.e. Only the MQMD message descriptor followed by the payload).

    <camel:setHeader headerName="JMSReplyTo"><camel:constant>TEST.REPLYTOQ</camel:constant></camel:setHeader>
    <camel:setHeader headerName="CamelJmsDestinationName"><camel:constant>queue://MYQMGR/TEST.Q2?targetClient=1</camel:constant></camel:setHeader>

更多MQMD字段可以通过保留 JMS 属性,如下图所示.请参阅限制 在 IBM 文档中.

More MQMD fields may be controlled through reserved JMS properties as illustrated below. See restrictions in IBM doc.

    <camel:setHeader headerName="JMS_IBM_Format"><camel:constant>MQSTR   </camel:constant></camel:setHeader>
    <camel:setHeader headerName="JMSCorrelationID"><camel:constant>_PLACEHOLDER_24_CHARS_ID_</camel:constant></camel:setHeader>

URI 中的目标队列被上面的 CamelJmsDestinationName 覆盖,因此 URI 中的队列名称成为占位符.

The destination queue in the URI is overwritten by the CamelJmsDestinationName above, hence the queue name in the URI becomes a placeholder.

URI 选项 preserveMessageQos 是 - 正如观察到的 - 允许发送带有 ReplyTo 数据的消息(以获取 MQ CoD 报告),但阻止 CAMEL 实例化 Reply 消息侦听器通过强制执行 InOnly MEP.

The URI option preserveMessageQos is the one that - as observed - allows sending a message with the ReplyTo data being set (to get the MQ CoD Report), yet prevent CAMEL to instantiate a Reply message listener by enforcing the InOnly MEP.

    <camel:to uri="wmq:queue:PLACEHOLDER.Q.NAME?concurrentConsumers=1&amp;
                exchangePattern=InOnly&amp;preserveMessageQos=true&amp;
                includeSentJMSMessageID=true" />
  </camel:route>
</camel:camelContext>

以下内容必须根据您的上下文进行调整.它为本地 JMS 提供程序和 Websphere MQ(通过本地 IBM WMQ JCA 资源适配器)提供队列工厂.我们在这里对管理对象使用 JNDI 查找.

The following must be adjusted to your context. It provides the queue factories for both a native JMS provider and Websphere MQ (via the native IBM WMQ JCA Resource Adapter). We use here JNDI lookups on administrative objects.

<camel:endpoint id="innerQueue" uri="jmsloc:queue:transitQueue">
</camel:endpoint>

<jee:jndi-lookup id="mqQCFBean" jndi-name="jms/MYQMGR_QCF"/>
<jee:jndi-lookup id="jmsraQCFBean" jndi-name="jms/jmsra_QCF"/>

<bean id="jmsloc" class="org.apache.camel.component.jms.JmsComponent">
  <property name="connectionFactory" ref="jmsraQCFBean" />
</bean>

<bean id="wmq" class="org.apache.camel.component.jms.JmsComponent">
  <property name="connectionFactory" ref="mqQCFBean" />
</bean>

</beans>

或者,如果您使用 MQ 客户端 jars 而不是资源适配器,您将像这样声明连接工厂 bean(代替上面的 JNDI 查找):

Alternatively, if you are using the MQ client jars instead of the Resource Adapter, you will declare the connection factory beans like (in place of JNDI lookups as above):

<bean id="mqCFBean" class="com.ibm.mq.jms.MQXAConnectionFactory">
    <property name="hostName" value="${mqHost}"/>
    <property name="port" value="${mqPort}"/>
    <property name="queueManager" value="${mqQueueManager}"/>
    <property name="channel" value="${mqChannel}"/>
    <property name="transportType" value="1"/> <!-- This parameter is fixed and compulsory to work with pure MQI java libraries -->
    <property name="appName" value="${connectionName}"/>
</bean>

<bean id="wmq" class="org.apache.camel.component.jms.JmsComponent">
    <property name="connectionFactory" ref="mqCFBean"/>
    <property name="transacted" value="true"/>
    <property name="acknowledgementModeName" value="AUTO_ACKNOWLEDGE"/>
</bean>

欢迎提出意见和改进.

这篇关于使用 CoD over Camel JMS 组件实现原生 websphere MQ的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!

08-24 12:27