我构建了一个针对JBoss 7.1.1和一个外部HornetQ实现2.2.1.4构建的Spring JMS解决方案。这已连接并成功工作。

但是,我现在正在使用EAP6,并试图连接到EAP6中打包的内部HornetQ。

我有一些用于管理连接和创建队列的类。但是似乎没有合适的方法可以连接到打包的HornetQ-与外部HornetQ的连接可以正常工作。

我已经用Redhat提出了这个问题,他们不确定如何解决,因为这也需要Spring编码。

我的问题是,我相信我需要创建一个QueueConnection,如QueueConnection qcon = queueConnectionFactory.createQueueConnection("user","password");

但是我们在Spring中实现它的方式是使用Spring JmsTemplate,并且无法向其添加队列连接的概念,因此它不起作用。

以下是包含必需的Spring Bean的jms-services.xml文件:

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context"
    xmlns:task="http://www.springframework.org/schema/task" xmlns:tx="http://www.springframework.org/schema/tx"
    xsi:schemaLocation="http://www.springframework.org/schema/beans
                        http://www.springframework.org/schema/beans/spring-beans.xsd
                        http://www.springframework.org/schema/context
                        http://www.springframework.org/schema/context/spring-context.xsd
                        http://www.springframework.org/schema/task
                        http://www.springframework.org/schema/task/spring-task.xsd
                        http://www.springframework.org/schema/tx
                        http://www.springframework.org/schema/tx/spring-tx-3.0.xsd">

    <context:annotation-config />

    <context:component-scan base-package="com.myproject.test" />

    <task:annotation-driven />

    <bean id="testTransactionManager"
        class="org.springframework.transaction.jta.JtaTransactionManager">
        <property name="transactionManagerName" value="java:/TransactionManager"></property>
        <property name="autodetectUserTransaction" value="false"></property>
    </bean>

    <tx:annotation-driven transaction-manager="testTransactionManager" />

    <bean id="queueConnectionFactory" class="com.myproject.test.impl.QueueConnectionFactoryImpl">
        <constructor-arg type="String" name="host" value="231.7.7.7" />
        <constructor-arg type="int" name="port" value="9876" />
        <constructor-arg type="boolean" name="useJta" value="true" />
        <constructor-arg type="boolean" name="useCluster" value="true" />
    </bean>

    <bean id="testQueueManager" class="com.myproject.test.impl.QueueManagerImpl">
        <constructor-arg ref="queueConnectionFactory" />
        <constructor-arg name="queue" value="TestQueue" />
    </bean>

</beans>


这是我的QueueConnectionFactoryImpl类:

package com.myproject.test.impl;

import java.util.HashMap;
import java.util.Map;

import javax.jms.ConnectionFactory;
import javax.jms.JMSException;

import org.hornetq.api.core.TransportConfiguration;
import org.hornetq.api.jms.HornetQJMSClient;
import org.hornetq.api.jms.JMSFactoryType;
import org.hornetq.core.remoting.impl.netty.NettyConnectorFactory;
import org.hornetq.core.remoting.impl.netty.TransportConstants;
import org.jboss.logging.Logger;

import com.myproject.test.QueueConnectionFactory;

public class QueueConnectionFactoryImpl implements QueueConnectionFactory {

    private String host;
    private int port;
    private ConnectionFactory connectionFactory;
    private Logger logger;
    private boolean useJta = false;

    public QueueConnectionFactoryImpl(String host, int port, boolean useJta)
    {
        this.useJta = useJta;
        createConnection(host, port);
    }

    public QueueConnectionFactoryImpl(String host, int port) {

        createConnection(host, port);
    }

    public QueueConnectionFactoryImpl(String host, int port, boolean useJta, boolean useCluster)
    {
        this.useJta = useJta;
        if(useCluster)
            createClusterConnection(host, port);
        else
            createConnection(host, port);
    }

    private void createConnection(String host, int port) {

        logger = Logger.getLogger(this.getClass());

        this.host = host;
        this.port = port;

        Map<String, Object> connectionParams = new HashMap<String, Object>();
        connectionParams.put(TransportConstants.PORT_PROP_NAME, port);
        connectionParams.put(TransportConstants.HOST_PROP_NAME, host);

        TransportConfiguration transportConfiguration = new TransportConfiguration(NettyConnectorFactory.class.getName(),  connectionParams);

        JMSFactoryType jmsFType = JMSFactoryType.CF;

        if(useJta)
            jmsFType = JMSFactoryType.XA_CF;

        connectionFactory = (ConnectionFactory) HornetQJMSClient.createConnectionFactoryWithoutHA(jmsFType, transportConfiguration);

    }

    private void createClusterConnection(String host, int port)
    {
        logger = Logger.getLogger(this.getClass());

        this.host = host;
        this.port = port;

        JMSFactoryType jmsFType = JMSFactoryType.CF;

        if(useJta)
        jmsFType = JMSFactoryType.XA_CF;

        connectionFactory = (ConnectionFactory) HornetQJMSClient.createConnectionFactoryWithHA(new DiscoveryGroupConfiguration(host, port), jmsFType);

    }

    public QueueConnectionFactoryImpl(Object connectionFactory)
    {
        logger = Logger.getLogger(this.getClass());
        logger.debug("Object is: "+connectionFactory);
    }

    public String getHost() {
        return host;
    }

    public void setHost(String host) {
        this.host = host;
    }

    public int getPort() {
        return port;
    }

    public void setPort(int port) {
        this.port = port;
    }

    public ConnectionFactory getConnectionFactory() {
        return connectionFactory;
    }

    public void setConnectionFactory(ConnectionFactory connectionFactory) {
        this.connectionFactory = connectionFactory;
    }

    public boolean isUseJta() {
        return useJta;
    }

    public void setUseJta(boolean useJta) {
        this.useJta = useJta;
    }

}


这是我的QueueManagerImpl代码

package com.myproject.test.impl;

import javax.jms.ConnectionFactory;
import javax.jms.DeliveryMode;
import javax.jms.Queue;

import org.apache.log4j.Logger;
import org.hornetq.api.jms.HornetQJMSClient;
import org.springframework.jms.core.JmsTemplate;

import com.myproject.test.QueueManager;

public class QueueManagerImpl implements QueueManager {

    private String queue;
    private ConnectionFactory connectionFactory;
    private JmsTemplate template;
    private Queue jmsQueue;
    private boolean useJta = false;
    private static final Logger log = Logger.getLogger(QueueManagerImpl.class);

    public QueueManagerImpl(QueueConnectionFactory queueConnectionFactory) {

        template = new JmsTemplate();
        connectionFactory = queueConnectionFactory.getConnectionFactory();
        try
        {
            this.setUseJta(queueConnectionFactory.isUseJta());
            template.setConnectionFactory(connectionFactory);
            template.setExplicitQosEnabled(true);
            template.setDeliveryMode(DeliveryMode.PERSISTENT);
            if(queueConnectionFactory.isUseJta())
                template.setSessionTransacted(true);
        }
        catch(Exception ex)
        {
            logError(ex.toString());
        }
    }

    public QueueManagerImpl(QueueConnectionFactory queueConnectionFactory, String queue) {

        this(queueConnectionFactory);
        setQueue(queue);
    }

    public String getQueue() {

        return queue;
    }

    public void setQueue(String queue) {

        try
        {
            jmsQueue = HornetQJMSClient.createQueue(queue);
            template.setDefaultDestination(jmsQueue);
            this.queue = queue;
        }
        catch(Exception ex)
        {
            logError(ex.toString());
        }
    }

    public JmsTemplate getTemplate() {
        return template;
    }

    public void logError(String error)
    {
        String details = String.format("Unable to connect to queue, details: %s ", error);
        String errorMessage = String.format("error...", details);
        log.error(errorMessage);
    }

    @Override
    public boolean isUseJta() {
        return useJta;
    }

    @Override
    public void setUseJta(boolean useJta) {
        this.useJta = useJta;
    }
}




最主要的是,以上代码需要将ConenctionFactory对象传递给QueueManagerImpl中的JmsTemplate-template.setConnectionFactory(connectionFactory);。

我尝试了几种方法来使其工作:

1)将以下内容添加到jsm-service.xml文件中:

<bean id="myConnectionFactory" class="org.springframework.jms.connection.UserCredentialsConnectionFactoryAdapter">
    <property name="targetConnectionFactory" ref="queueConnectionFactory"/>
    <property name="username" value="myuser"/>
    <property name="password" value="myuser123"/>
</bean>


这将创建以下异常:


java.lang.IllegalStateException:无法将属性'targetConnectionFactory'的[com.myproject.test.impl.QueueConnectionFactoryImpl]类型的值转换为所需的类型[javax.jms.ConnectionFactory]:未找到匹配的编辑器或转换策略


2)将QueueConnectionFactoryImpl中的连接更改为:

org.hornetq.jms.client.HornetQConnectionFactory HQConnectionFactory = (HornetQConnectionFactory) HornetQJMSClient.createConnectionFactoryWithoutHA(jmsFType, transportConfiguration);

try {
    connectionFactory = (ConnectionFactory) HQConnectionFactory.createConnection("myuser","myuser123");
} catch (JMSException e) {
    // TODO Auto-generated catch block
    e.printStackTrace();
}


这也不起作用。我有一个例外:


java.lang.ClassCastException:org.hornetq.jms.client.HornetQConnection无法转换为javax.jms.ConnectionFactory




简而言之,任何人都可以通过以某种方式提供用户名和密码的方式来帮助我获得上述代码以连接到HornetQ的方法,以便我仍然可以使用JmsTemplate。

最佳答案

只需使用您自己的连接工厂即可。它为我工作:

Spring bean(5445是hornetq接受器端口):

<bean name="jmsConnectionFactory" class="messaging.jms.CustomHornetQJMSConnectionFactory">
    <constructor-arg name="ha" value="false" /> <!-- set true if you want support failover -->
    <constructor-arg name="commaSepratedServerUrls" value="127.0.0.1:5445" />
    <property name="username" value="admin" />
    <property name="password" value="admin" />
</bean>


连接工厂实现(用于hornetq-jms-client中的HornetQJMSConnectionFactory和用于hornetq-core-client中的TransportConfiguration):

public class CustomHornetQJMSConnectionFactory extends org.hornetq.jms.client.HornetQJMSConnectionFactory
{
    private static final long serialVersionUID = 1L;

    private String username;
    private String password;

    public CustomHornetQJMSConnectionFactory(boolean ha, String commaSepratedServerUrls)
    {
        super(ha, converToTransportConfigurations(commaSepratedServerUrls));
    }

    public static TransportConfiguration[] converToTransportConfigurations(String commaSepratedServerUrls)
    {
        String [] serverUrls = commaSepratedServerUrls.split(",");
        TransportConfiguration[] transportconfigurations = new TransportConfiguration[serverUrls.length];
        for(int i = 0; i < serverUrls.length; i++)
        {
            String[] urlParts = serverUrls[i].split(":");
            HashMap<String, Object> map = new HashMap<String,Object>();
            map.put(TransportConstants.HOST_PROP_NAME, urlParts[0]);
            map.put(TransportConstants.PORT_PROP_NAME, urlParts[1]);
            transportconfigurations[i] = new TransportConfiguration(NettyConnectorFactory.class.getName(), map);
        }
        return transportconfigurations;
    }

    @Override
    public Connection createConnection() throws JMSException
    {
        return super.createConnection(username, password);
    }
    public String getUsername() {
        return username;
    }
    public void setUsername(String username) {
        this.username = username;
    }
    public String getPassword() {
        return password;
    }
    public void setPassword(String password) {
        this.password = password;
    }
}


现在,如果将此连接工厂提供给jmsTemplate,则可以使用user / pass来发送/使用消息

09-30 22:49