一、需求介绍
后端使用Spring Boot2.0框架,要实现IBM MQ的实时数据JMS监听接收处理,并形成回执通过MQ队列发送。
二、引入依赖jar包
<dependency> <groupId>org.springframework</groupId> <artifactId>spring-jms</artifactId> <version>4.3.18.RELEASE</version> </dependency> <dependency> <groupId>javax.jms</groupId> <artifactId>javax.jms-api</artifactId> </dependency> <dependency> <groupId>com.ibm.mq</groupId> <artifactId>com.ibm.mq.allclient</artifactId> <version>9.1.0.0</version> </dependency>
三、监听实现
代码中分为三大块:
1、MQ通道连接,我这边是用的用户名密码连接,如果非密码的可不入参
2、MQ的队列连接并实现监听
3、MQ发送
@Configuration public class MqTestConfig { @Autowired private MqProperties mqProperties; /**=======================MQ 通道工厂============================**/ @Bean(name="mqQueueConnectionFactory") public MQQueueConnectionFactory mqQueueConnectionFactory(){ MQQueueConnectionFactory mqQueueConnectionFactory = new MQQueueConnectionFactory(); mqQueueConnectionFactory.setHostName(mqProperties.getHostName()); try { mqQueueConnectionFactory.setTransportType(WMQConstants.WMQ_CM_CLIENT); mqQueueConnectionFactory.setCCSID(mqProperties.getCcsid()); mqQueueConnectionFactory.setChannel(mqProperties.getChannel()); mqQueueConnectionFactory.setPort(mqProperties.getPort()); mqQueueConnectionFactory.setQueueManager(mqProperties.getQueueManager()); } catch (JMSException e) { e.printStackTrace(); } return mqQueueConnectionFactory; } @Bean(name="userCredentialsConnectionFactoryAdapter") public UserCredentialsConnectionFactoryAdapter userCredentialsConnectionFactoryAdapter(MQQueueConnectionFactory mqQueueConnectionFactory){ UserCredentialsConnectionFactoryAdapter userCredentialsConnectionFactoryAdapter = new UserCredentialsConnectionFactoryAdapter(); userCredentialsConnectionFactoryAdapter.setUsername(mqProperties.getUserName()); userCredentialsConnectionFactoryAdapter.setPassword(mqProperties.getPassword()); userCredentialsConnectionFactoryAdapter.setTargetConnectionFactory(mqQueueConnectionFactory); return userCredentialsConnectionFactoryAdapter; } /**============================MQ 消息监听接收=============================**/ //队列连接 @Bean(name="mqueue") public MQQueue mqueue(){ MQQueue mqQueue = new MQQueue(); try { mqQueue.setBaseQueueName(mqProperties.getBaseQueueNameRecv()); mqQueue.setBaseQueueManagerName(mqProperties.getBaseQueueManagerName()); } catch (JMSException e) { e.printStackTrace(); } return mqQueue; } //对队列进行监听 @Bean(name="simpleMessageListenerContainer") public SimpleMessageListenerContainer simpleMessageListenerContainer(UserCredentialsConnectionFactoryAdapter userCredentialsConnectionFactoryAdapter,MQQueue mqueue){ SimpleMessageListenerContainer simpleMessageListenerContainer = new SimpleMessageListenerContainer(); simpleMessageListenerContainer.setConnectionFactory(userCredentialsConnectionFactoryAdapter); simpleMessageListenerContainer.setDestination(mqueue); simpleMessageListenerContainer.setMessageListener(decMqRiskRecvService()); return simpleMessageListenerContainer; } //报文处理类 @Bean(name="decMqRiskRecvService") public DecMqRiskRecvService decMqRiskRecvService(){ return new DecMqRiskRecvService(); } /**============================MQ 发送消息============================**/ @Bean(name="cachingConnectionFactory") public CachingConnectionFactory cachingConnectionFactory(UserCredentialsConnectionFactoryAdapter userCredentialsConnectionFactoryAdapter){ CachingConnectionFactory cachingConnectionFactory = new CachingConnectionFactory(); cachingConnectionFactory.setTargetConnectionFactory(userCredentialsConnectionFactoryAdapter); cachingConnectionFactory.setSessionCacheSize(5); cachingConnectionFactory.setReconnectOnException(true); return cachingConnectionFactory; } @Bean(name="jmsTransactionManager") public PlatformTransactionManager jmsTransactionManager(CachingConnectionFactory cachingConnectionFactory){ JmsTransactionManager jmsTransactionManager = new JmsTransactionManager(); jmsTransactionManager.setConnectionFactory(cachingConnectionFactory); return jmsTransactionManager; } @Bean(name="jmsOperations") public JmsOperations jmsOperations(CachingConnectionFactory cachingConnectionFactory){ JmsTemplate jmsTemplate = new JmsTemplate(cachingConnectionFactory); jmsTemplate.setReceiveTimeout(mqProperties.getReceiveTimeout()); return jmsTemplate; } }
mq配置文件
记得要添加get和set方法
@Configuration @ConfigurationProperties(prefix=MqProperties.MQ_PREFIX) public class MqProperties { public static final String MQ_PREFIX = "mq"; private String hostName; private int port; private String channel; private int ccsid; private String userName; private String password; private String queueManager; private String baseQueueManagerName; private String baseQueueNameRecv; private String baseQueueNameSend; private long receiveTimeout; }
报文处理类及回执发送
1、实现类要实现MessageListener,重写onMessage方法,Message就是监听到的消息。
2、读取报文时为防止乱码,我这边按照格式分两种方式读取转码。
3、发送回执,之前发送发现报文多出了一些报文头信息,所以在队列信息加了
"queue:///" + mqProperties.getBaseQueueNameSend() + "?targetClient=1"
这样发送的报文会去掉报文头信息。
@Service public class DecMqRiskRecvService implements MessageListener { @Autowired private JmsOperations jmsOperations; @Autowired private MqProperties mqProperties; @Override public void onMessage(Message message) { String str = null; // 1、读取报文 try { if (message instanceof BytesMessage) { BytesMessage bm = (BytesMessage) message; byte[] bys = null; bys = new byte[(int) bm.getBodyLength()]; bm.readBytes(bys); str = new String(bys, "UTF-8"); } else { str = ((TextMessage) message).getText(); str = new String(str.getBytes("ISO-8859-1"), "UTF-8"); } } catch (JMSException e) { e.printStackTrace(); } catch (UnsupportedEncodingException e) { e.printStackTrace(); } // 2、处理报文 // 3、组装回执发送 String receipt = ""; try { jmsOperations.convertAndSend("queue:///" + mqProperties.getBaseQueueNameSend() + "?targetClient=1", receipt.getBytes("UTF-8")); } catch (JmsException e) { e.printStackTrace(); } catch (UnsupportedEncodingException e) { e.printStackTrace(); } } }