本文介绍了在Coamel JMS组件上使用CoD实现本机Websphere MQ的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我很难用Apache CAMEL实现一个Websphere MQ(WMQ)连接器,该连接器可以无一例外地处理MQ交付确认(CoD)报告,也不会产生不必要的响应数据报形式的副作用.最后,如果您习惯于编写本机MQ客户端,则可以按照我想要的方式进行操作,这是一种标准且通用的方法.我在同一个主题的帖子中记录了该方法,但是我发现该解决方案充满了复杂性,非常感谢任何建议或示例,可以使实现更简洁,更优雅.

I had much difficulties implementing a Websphere MQ (WMQ) connector with Apache CAMEL that could handle MQ Confirmation of Delivery (CoD) reports without exceptions, neither side effects in the form of unwanted response datagrams. In the end I got it working the way I wanted, a much standard and common way if you are accustomed to writing native MQ clients. I document the method in a post to this same topic, but I find the solution bloated with intricacies and would very much appreciate any advice or example to make the implementation cleaner and more elegant.

我了解到,问题的根源是MQ设计请求-应答消息交换模式(MEP)的方式,JMS规范的方式以及JMS组件中请求-应答MEP的CAMEL实现.三种不同的哲学!

I understood that the problem takes its roots into the way MQ designed request-reply Message Exchange Pattern (MEP), versus the way JMS specifications did, versus CAMEL implementation of the Request-Reply MEP in its JMS Component. Three different philosophies!

  1. WMQ具有MessageType标头(请参见 MQMD字段常量),其值1代表请求,2代表回复,8代表数据报(单向MEP).另外,值4用于标记报告消息,其形式为CoD(交付确认),PAN(正向确认)和NAN(负向确认),就消息流而言,它们还会做出附加的回复信息.可以使用另一个名为报告"的标头字段为请求消息,答复或数据报请求CoD,PAN和NAN确认,其中可以合并所有报告变量的标志.附加标头字段"ReplyToQ"和"ReplyToQMgr"指定原始发件人希望在其上进行报告和答复的队列和队列管理器,以及固定24字节"CorrelId"字段(可选),可以帮助进行关联报告并回复原始数据报或请求消息.为了使其更加复杂,实际上可以使用相同的原始消息ID而不返回CorrelID来发送回覆和报告,或者在CorrelId中提供原始消息ID,或者在原始Request或Datagram中已经指定时返回CorrelId值. IBM提供了通过WMQ的JMS API ,允许在WMQ上通过隧道传输普通的JMS交换(在额外的消息头名称 MQRFH2 ),或访问将本机MQ消息映射到JMS消息,反之亦然.
  2. 另一方面,JMS规范提供了一个可选的"JMSReplyTo"头字段和一个"JMSCorrelationID",但确实将确切的MEP语义留给了客户端应用程序.即在规格中说明:"响应可能是可选的;应由客户决定."
  3. CAMEL具有XML或Java DSL中的路由"功能,以及一个内部Exchange对象模型,旨在支持 EIP模式,其中包括请求-答复模式.然后,CAMEL在其 JMS组件中假定,如果设置了JMSReplyTo字段,则这必然是期望的请求回复,导致将Exchange的Out部分(如果Out为空,则修改为In部分)返回到JMSReplyTo中定义的队列.
  1. WMQ features a MessageType header (see MQMD fields and constants) that bears value 1 for request, 2 for reply, and 8 for datagram (one way MEP). In addition, the value 4 is used to mark Report messages in the form of CoD (Conf. of Delivery), PAN (Positive AckNowledge) and NAN (Negative AckNowledge), which - in term of the message flow - also make an additional Reply message. The CoD, PAN, and NAN acks can be requested for either Request messages, Replies or Datagrams using another header field named 'Report' in which flags for all report variants can be combined. Additional header fields 'ReplyToQ' and 'ReplyToQMgr' specify the Queue and Queue Manager on which Reports and Replies are expected by the original sender, and a fixed 24 bytes 'CorrelId' field - optional - can help correlate Reports and Replies to the original Datagram or Request mesage. To make it more complex, one can indeed send back Replies and Reports with the same original Message ID and no CorrelID, or provide the original Message ID in the CorrelId, or return the CorrelId value when already specified in the original Request or Datagram. IBM provides a JMS API over WMQ, allowing either to tunnel plain JMS exchanges over WMQ as transport (with help from an extra message header name MQRFH2), or to map native MQ messages onto JMS messages and vice-versa.
  2. On the other hand, JMS specs provide an optional 'JMSReplyTo' header field and a 'JMSCorrelationID', but do leave the exact MEP semantics to client applications; namely stating in specs: "A response may be optional; it is up to the client to decide."
  3. CAMEL features 'routes' in XML or Java DSL and a inner Exchange Object model with the purpose to support EIP Patterns amongst which the Request-Reply pattern. CAMEL then assumes in its JMS Component that if the JMSReplyTo field is set, this is necessarily a Request expecting a Reply, causing the Out part (or modified In part if Out is empty) of an Exchange to be returned to the queue defined in JMSReplyTo.

我愿意通过远程Websphere队列管理器通过传递确认(CoD)报告来支持本机MQ消息交换,因此,除了事务和持久性(即无损失,无重复)外,还可以跟踪何时消息已耗尽,并在出现延迟的情况下发出警报.

I was willing to support native MQ Message exchanges with Confirmation of Delivery (CoD) reports through a remote Websphere Queue Manager, so that, in addition to transaction and persistence (i.e. no loss, no duplicates) one can also track when a message is consumed and raise alerts in case of delays.

默认情况下,当队列中的消息使用完成时,Websphere队列管理器会生成CoD报告.因此,没有任何特定设置,当CAMEL终结点使用时,远程MQ客户端发送带有CoD标志的数据报(然后是强制性的ReplyToQ)将作为第一个MQ报告从队列管理器获得第一个答复.消息,然后是CAMEL明确返回的第二个(意外)回复消息,其中包含CAMEL路由末尾Exchange对象中剩余的内容,因为鉴于存在JMSReplyTo字段(映射),CAMEL假定为Request-Reply EIP来自MQ ReplyToQ和ReplyToQMgr,它们都是支持CoD返回流所必需的.

By default, the Websphere Queue Manager generates CoD Reports when message consumption from the queue completes. Therefore, without any specific settings, a remote MQ client sending a datagram with the CoD flag (and the then compulsory ReplyToQ) will get a first reply as MQ Report back from the Queue Manager when the CAMEL endpoint consumes the message, followed by a second (unexpected) reply message explicitly returned by CAMEL and containing whatever is left in the Exchange object at the end of the CAMEL route, because CAMEL assumes a Request-Reply EIP given that the JMSReplyTo field is present (mapped from MQ ReplyToQ and ReplyToQMgr, both being required to support the CoD return flow).

在没有特定设置的情况下,CAMEL在默认情况下也假定出站连接上的请求-答复EIP/MEP.然后,CAMEL JMS/MQ端点将等待 1 响应返回.当OUTbound消息是基于MQ的JMS时(因此带有MQRFH2标头),则工作正常.当强制使用普通的原始MQ时,即按如下所示删除MQRFH2标头时,尽管跟踪值似乎都正确(强制使用24个字符相关性ID,以便较长的CorrelId值被截断或填充为空),但我无法使端点侦听器与相关的传入MQ报告相匹配. MQ不能对关联过滤器进行地理解析).有人能解决这个问题吗?

Without specific settings, CAMEL assumes too by default a Request-reply EIP / MEP on the outbound connection. The CAMEL JMS/MQ endpoint will then wait for 1 response back. When the OUTbound message is JMS over MQ (hence with MQRFH2 header), this works fine. When forcing plain vanilla MQ, i.e. removing MQRFH2 header as below, I was unable to get the endpoint listener match the correlated incoming MQ Report, although traced values seem all correct (enforcing 24 char correlation IDs so that truncation of longer CorrelId values or null padding by MQ cannot geopardize the correlation filter). Has anyone been able to solve that issue?

详细信息:尽管IBM JMS API接受传递特定的JMS属性值WMQ_MESSAGE_BODY = {1 | 0}/WMQ_TARGET_CLIENT = {1 | 0}以控制所生成消息中JMS头MQRFH2的存在,这些选项将无法通过CAMEL起作用.必须使用 CamelJmsDestinationName 标头(如《 MELEL JMS文档》 ),以提供带有选项"targetClient = 1"的目标的IBM队列URL,以摆脱MQRFH2标头.但是如果没有此标头,则CoD报告或MQ答复上的CAMEL关联确实会失败.

Details: Although IBM JMS API accepts passing specific JMS property values WMQ_MESSAGE_BODY={1|0} / WMQ_TARGET_CLIENT={1|0} to control the presence of the JMS header MQRFH2 in the generated messages, these options become inoperative through CAMEL. One must use the CamelJmsDestinationName header (as explained in CAMEL JMS doc) to supply an IBM queue URL for the destination featuring the option "targetClient=1" in order to get rid of the MQRFH2 header. But without this header, the CAMEL correlation on the CoD Report or MQ reply does fail.

上述问题的解决方案确实是关于构建特定的CAMEL路由来处理远程方返回的CoD报告(以及相关的MQ答复).因此,CAMEL出站邮件必须强制为" InOnly " ExchangePattern 因此,您无需等待任何回复.但是,这将导致CAMEL抑制所有的ReplyTo字段.然后,如果在出站MQ数据报上请求MQ CoD,则会发生CAMEL异常,其原因为MQException JMSCMQ0001: WebSphere MQ call failed with compcode '2' ('MQCC_FAILED') reason '2027' ('MQRC_MISSING_REPLY_TO_Q').

The solution to the above issue is indeed about building a specific CAMEL route to handle the CoD Reports returned by the remote party (as well as correlated MQ Replies). So CAMEL outbound messages must be enforced as "InOnly" ExchangePattern and thus not wait for any reply. But this will cause CAMEL to suppress too all ReplyTo fields. Then if a MQ CoD is requested on an outbound MQ datagram, a CAMEL exception ensues whose cause is MQException JMSCMQ0001: WebSphere MQ call failed with compcode '2' ('MQCC_FAILED') reason '2027' ('MQRC_MISSING_REPLY_TO_Q').

CAMEL记录了一个URI选项"disableReplyTo = true",以禁用交换模式上的回复侦听,但保留ReplyTo字段-显然是在入站和出站交换上.但是,此选项在出站JMS交换上不起作用(据观察,我可能是错的),并且必须使用不太直观的"preserveMessageQos"选项.

CAMEL documents an URI option 'disableReplyTo=true' to disable the reply-listening on exchange patterns yet keep the ReplyTo fields - apparently on both inbound and outbound exchanges. But this option does not work (as observed, I may be wrong) on outbound JMS exchanges and one must use the much less intuitive 'preserveMessageQos' option instead.

欢迎为这些问题提供优雅的解决方案.

Elegant solutions to these issues are welcome.

推荐答案

下面已记录了我所能获得的最好的知识,并举例说明为本身承载CAMEL上下文和路由的Spring XML应用程序上下文.它可与IBM原生MQ JCA兼容资源适配器v7.5,CAMEL 2.15,Spring core 4.2一起使用.我可以将其部署到Glassfish服务器和Weblogic服务器.

The best I have been able to get is documented below, illustrated as a Spring XML application context that itself hosts the CAMEL context and routes. It works with the IBM native MQ JCA-compliant resource adapter v7.5, CAMEL 2.15, Spring core 4.2. I can deploy it to both Glassfish and Weblogic servers.

当然,在给定众多变量的情况下,Java DSL确实用于实际实现中.这个基于CAMEL XML DSL的示例是独立的,易于测试.

Of course a java DSL is used in a real implementation given the numerous variables. This example based on the CAMEL XML DSL is self-contained and easy to test.

我们从Spring&骆驼声明:

We start with Spring & Camel declarations:

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xmlns:util="http://www.springframework.org/schema/util"
    xmlns:context="http://www.springframework.org/schema/context"
    xmlns:camel="http://camel.apache.org/schema/spring"
    xmlns:jee="http://www.springframework.org/schema/jee"
    xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
        http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd
        http://www.springframework.org/schema/util http://www.springframework.org/schema/util/spring-util.xsd
        http://camel.apache.org/schema/spring http://camel.apache.org/schema/spring/camel-spring.xsd
        http://www.springframework.org/schema/jee http://www.springframework.org/schema/jee/spring-jee-3.0.xsd">

CAMEL上下文遵循2条路由:从MQ到JMS和从JMS到MQ,在这里链接起来以形成简化测试的桥梁.

The CAMEL context follows with 2 routes: MQ to JMS and JMS to MQ, here chained to form a bridge to ease testing.

<camel:camelContext id="mqBridgeCtxt">

  <camel:route id="mq2jms" autoStartup="true">

怪异的:使用本机MQ资源适配器时,获取(例如)3个侦听器的唯一方法是强制执行3个连接(依次使用3个Camel:from语句),每个连接最多1个会话,否则会出现MQ错误:MQJCA1018: Only one session per connection is allowed.但是,如果改为使用MQ客户端jar,则CAMEL JMS中的concurentConsumers选项确实可以正常工作.

Weird: when using the native MQ Resource adapter the only way to get (e.g.) 3 listeners is to enforce 3 connections (with 3 Camel:from statements in sequence) with max 1 session each, otherwise an MQ error ensues: MQJCA1018: Only one session per connection is allowed. However, if you use MQ client jars instead, then the concurentConsumers option in CAMEL JMS does work fine.

    <camel:from uri="wmq:queue:TEST.Q1?concurrentConsumers=1&amp;disableReplyTo=true&amp;
            acknowledgementModeName=SESSION_TRANSACTED"/>
    <camel:from uri="wmq:queue:TEST.Q1?concurrentConsumers=1&amp;disableReplyTo=true&amp;
            acknowledgementModeName=SESSION_TRANSACTED"/>
    <camel:from uri="wmq:queue:TEST.Q1?concurrentConsumers=1&amp;disableReplyTo=true&amp;
            acknowledgementModeName=SESSION_TRANSACTED"/>

上面的disable disableReplyTo选项可确保CAMEL在我们测试MQ消息类型为1 = Request(-reply)或8 = datagram(单向!)之前不会产生答复.该测试和答复的构造未在此处说明.

The disable disableReplyTo option above ensures that CAMEL will not produce a reply before we can test the MQ message type to be 1=Request(-reply) or 8=datagram (one way!). That test and reply construction is not illustrated here.

然后,在下一次发布到纯JMS时,我们将EIP强制为InOnly,以与入站MQ模式保持一致.

Then we enforce the EIP to InOnly on the next posting to plain JMS to be consistent with the Inbound MQ mode.

    <camel:setExchangePattern pattern="InOnly"/>
    <!-- camel:process ref="reference to your MQ message processing bean fits here" / -->
    <camel:to uri="ref:innerQueue" />
  </camel:route>

接下来是从jms到MQ的路线:

Next comes the jms-to-MQ route:

  <camel:route id="jms2mq"  autoStartup="true">
    <camel:from uri="ref:innerQueue" />
    <!-- remove inner message headers and properties to test without inbound side effects! -->
    <camel:removeHeaders pattern="*"/>
    <camel:removeProperties pattern="*" />
    <!-- camel:process ref="reference to your MQ message preparation bean fits here" / -->

现在是远程目标要返回的MQ CoD报告的请求标志.我们还将MQ消息强制为数据报类型(值8).

Now comes the request flag for the MQ CoD report to be returned by remote destination. We also enforce the MQ message to be of Datagram type (value 8).

    <camel:setHeader headerName="JMS_IBM_Report_COD"><camel:simple resultType="java.lang.Integer">2048</camel:simple></camel:setHeader>
    <camel:setHeader headerName="JMS_IBM_Report_Pass_Correl_ID"><camel:simple resultType="java.lang.Integer">64</camel:simple></camel:setHeader>
    <camel:setHeader headerName="JMS_IBM_MsgType"><camel:simple resultType="java.lang.Integer">8</camel:simple></camel:setHeader>

ReplyTo队列可以通过 ReplyTo uri选项指定,也可以如下所示作为标头.

The ReplyTo queue can be specified either via the ReplyTo uri option, else as a header as below.

接下来,我们确实使用 CamelJmsDestinationName 头来强制禁止JMS MQ消息头 MQRFH2 (使用targetClient MQ URL选项值1).换句话说,我们要发送普通的原始MQ二进制消息(即,仅MQMD消息描述符后跟有效负载).

Next we do use CamelJmsDestinationName header to enforce suppressing of the JMS MQ message header MQRFH2 (using targetClient MQ URL option value 1). In other words, we want to send a plain vanilla MQ binary message (i.e. Only the MQMD message descriptor followed by the payload).

    <camel:setHeader headerName="JMSReplyTo"><camel:constant>TEST.REPLYTOQ</camel:constant></camel:setHeader>
    <camel:setHeader headerName="CamelJmsDestinationName"><camel:constant>queue://MYQMGR/TEST.Q2?targetClient=1</camel:constant></camel:setHeader>

更多的MQMD字段可以通过保留了JMS属性,如下所示.请参阅限制在IBM文档中.

More MQMD fields may be controlled through reserved JMS properties as illustrated below. See restrictions in IBM doc.

    <camel:setHeader headerName="JMS_IBM_Format"><camel:constant>MQSTR   </camel:constant></camel:setHeader>
    <camel:setHeader headerName="JMSCorrelationID"><camel:constant>_PLACEHOLDER_24_CHARS_ID_</camel:constant></camel:setHeader>

URI中的目标队列被上面的 CamelJmsDestinationName 覆盖,因此URI中的队列名称成为一个占位符.

The destination queue in the URI is overwritten by the CamelJmsDestinationName above, hence the queue name in the URI becomes a placeholder.

如所观察到的那样,URI选项 preserveMessageQos 是允许发送设置了ReplyTo数据的消息(以获取MQ CoD报告),但阻止CAMEL实例化Reply消息侦听器的方法.通过执行InOnly MEP.

The URI option preserveMessageQos is the one that - as observed - allows sending a message with the ReplyTo data being set (to get the MQ CoD Report), yet prevent CAMEL to instantiate a Reply message listener by enforcing the InOnly MEP.

    <camel:to uri="wmq:queue:PLACEHOLDER.Q.NAME?concurrentConsumers=1&amp;
                exchangePattern=InOnly&amp;preserveMessageQos=true&amp;
                includeSentJMSMessageID=true" />
  </camel:route>
</camel:camelContext>

以下内容必须根据您的情况进行调整.它为本地JMS提供程序和Websphere MQ(通过本地IBM WMQ JCA资源适配器)提供了队列工厂.我们在这里在管理对象上使用JNDI查找.

The following must be adjusted to your context. It provides the queue factories for both a native JMS provider and Websphere MQ (via the native IBM WMQ JCA Resource Adapter). We use here JNDI lookups on administrative objects.

<camel:endpoint id="innerQueue" uri="jmsloc:queue:transitQueue">
</camel:endpoint>

<jee:jndi-lookup id="mqQCFBean" jndi-name="jms/MYQMGR_QCF"/>
<jee:jndi-lookup id="jmsraQCFBean" jndi-name="jms/jmsra_QCF"/>

<bean id="jmsloc" class="org.apache.camel.component.jms.JmsComponent">
  <property name="connectionFactory" ref="jmsraQCFBean" />
</bean>

<bean id="wmq" class="org.apache.camel.component.jms.JmsComponent">
  <property name="connectionFactory" ref="mqQCFBean" />
</bean>

</beans>

或者,如果您使用的是MQ客户端jar而不是资源适配器,则将声明连接工厂bean,例如(代替上面的JNDI查找):

Alternatively, if you are using the MQ client jars instead of the Resource Adapter, you will declare the connection factory beans like (in place of JNDI lookups as above):

<bean id="mqCFBean" class="com.ibm.mq.jms.MQXAConnectionFactory">
    <property name="hostName" value="${mqHost}"/>
    <property name="port" value="${mqPort}"/>
    <property name="queueManager" value="${mqQueueManager}"/>
    <property name="channel" value="${mqChannel}"/>
    <property name="transportType" value="1"/> <!-- This parameter is fixed and compulsory to work with pure MQI java libraries -->
    <property name="appName" value="${connectionName}"/>
</bean>

<bean id="wmq" class="org.apache.camel.component.jms.JmsComponent">
    <property name="connectionFactory" ref="mqCFBean"/>
    <property name="transacted" value="true"/>
    <property name="acknowledgementModeName" value="AUTO_ACKNOWLEDGE"/>
</bean>

欢迎评论和改进.

这篇关于在Coamel JMS组件上使用CoD实现本机Websphere MQ的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!

08-24 12:27