现在的努力是为了将来不会无能为力

现在的努力是为了将来不会无能为力

为什么要使用RabbitMQ?

消息队列的作用

  • 异步调用
  • 系统解耦
  • 削峰限流
  • 消息通讯

消息队列的缺点

  • 系统可用性降低
  • 系统稳定性降低
  • 分布式一致性问题(可靠消息最终一致性的分布式事务方案解决)

RabbitMQ的优势

  • 支持高并发、高吞吐、性能好
  • 有完善的后台管理界面
  • 它还支持集群化、高可用部署架构、消息高可靠支持
  • RabbitMQ的开源社区很活跃,较高频率的迭代版本,来修复发现的bug以及进行各种优化
  • 最重要的是它是开源免费的。

RabbitMQ的缺点

  • 它是基于erlang语言开发的,所以导致较为难以分析里面的源码,也较难进行深层次的源码定制和改造,毕竟需要较为扎实的erlang语言功底才可以。

核心概念

  • Server:又称Broker,接受客户端的连接,实现AMQP实体服务;
  • Connection:连接,应用程序与Broker的网络连接;
  • Channel:网络通道,也称信道,几乎所有的通道都在Channel中进行的,Channel是进行消息读写的通道。客户端可建立多个Channel,每个Channel代表一个会话任务;是建立在“真实的”TCP连接内的虚拟连接。AMQP命令都是通过信道发送出去的。那么我们为什么需要信道呢?为什么不直接通过TCP连接发送AMQP命令呢?因为操作系统建立和销毁TCP会话是非常昂贵的开销,而且操作系统只能建立数量不多的TCP连接,很快就达到性能瓶颈,无法满足高性能的需求。
  • Message:消息,服务器和应用程序之间传送的数据,由Properties和Body组成。Properties可以对消息进行修饰,比如消息的优先级、等高级特性;Body则就是消息体的内容;
  • Virtual host:虚拟地址,用于进行逻辑隔离,最上层的消息路由,并非物理概念。一个Virtual Host里面有若干个Exchange和Queue,同一个Virtual Host里面不能有相同名称的Exchange或者Queue;
  • Exchange:交换机,接收消息,根据路由键转发消息到绑定的队列;
    交换机的类型:direct:直连,fanout:广播,headers:以...开头,topic:主题
    参考:Spring Boot RabbitMQ 四种交换器
  • Binding:Exchange和Queue之间的虚拟连接,binding中可以包含routing key;
  • Routing Key:一个路由规则,虚拟机可用它来确定如何路由一个特定的消息;
  • Queue:也称为Message Queue,消息队列,保存消息并将它们转发给消费者。

几种交换机

  • Direct:直连交换机
    RabbitMQ系列之---初识RabbitMQ-LMLPHP
    所有发送到Direct Exchange的消息被转发到RouteKey中指定的Queue
    注意:Direct模式可以直接使用RabbitMQ自带的Exchange:default,并以队列名作为路由键。 Exchange,所以不需要将Exchange进行任何绑定(binding)操作,消息传递时,RouteKey必须完全匹配才会被队列接收,否则该消息会被抛弃。
public class Producer4DirectExchange {
    public static void main(String[] args) throws IOException, TimeoutException {
        ...
        Channel channel = connection.createChannel();

        String exchangeName = "test_direct_exchange";
        String routingKey = "test.direct";
        String msg = "Hello World RabbitMQ 4 Direct Exchange Message...";

        channel.basicPublish(exchangeName, routingKey, null, msg.getBytes());
        ...
    }
}
public class Consumer4DirectExchange {
    public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
       ...
        Channel channel = connection.createChannel();

        String exchangeName = "test_direct_exchange";
        String exchangeType = "direct";
        String queueName = "test_direct_queue";
        String routingKey = "test.direct";

        // 声明交换机
        channel.exchangeDeclare(exchangeName, exchangeType, true, false, false, null);
        // 声明队列
        channel.queueDeclare(queueName, false, false, false, null);
        // 交换机与队列绑定
        channel.queueBind(queueName, exchangeName, routingKey);

        QueueingConsumer queueingConsumer = new QueueingConsumer(channel);

        channel.basicConsume(queueName,true,queueingConsumer);

        while (true) {
            QueueingConsumer.Delivery delivery = queueingConsumer.nextDelivery();
            String msg = new String(delivery.getBody());
            System.out.println("收到消息: " + msg);
        }
    }
}
  • Topic:主题交换机
    RabbitMQ系列之---初识RabbitMQ-LMLPHP
    所有发送到Topic Exchange的消息被转发到所有关心RouteKey中指定的Queue上,Exchange将RouteKey和某个Topic进行模糊匹配,此时队列需要绑定一个Topic。
    注意:可以使用通配符进行模糊匹配
"#" 匹配一个或多个词
"*" 匹配一个词
"." 是单词的分隔符

例如:
"log.#":能够匹配到"log.info.oa
"log.*":只会匹配到"log.error"
public class Producer4TopicExchange {
    public static void main(String[] args) throws IOException, TimeoutException {
        ...
        Channel channel = connection.createChannel();

        String exchangeName = "test_topic_exchange";
        String routingKey1 = "user.save";
        String routingKey2 = "user.update";
        String routingKey3 = "user.delete.ok";

        String msg = "Hello World RabbitMQ 4 Topic Exchange Message...";

        channel.basicPublish(exchangeName, routingKey1, null, msg.getBytes());
        channel.basicPublish(exchangeName, routingKey2, null, msg.getBytes());
        channel.basicPublish(exchangeName, routingKey3, null, msg.getBytes());
        ...
    }
}
public class Consumer4TopicExchange {
    public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
        ...
        Channel channel = connection.createChannel();

        String exchangeName = "test_topic_exchange";
        String exchangeType = "topic";
        String queueName = "test_topic_queue";
        String routingKey = "user.*";

        channel.exchangeDeclare(exchangeName, exchangeType, true, false, null);
        channel.queueDeclare(queueName, false, false, false, null);
        channel.queueBind(queueName, exchangeName, routingKey);

        QueueingConsumer queueingConsumer = new QueueingConsumer(channel);

        channel.basicConsume(queueName, true, queueingConsumer);

        while (true) {
            QueueingConsumer.Delivery delivery = queueingConsumer.nextDelivery();
            String msg = new String(delivery.getBody());
            System.out.println("收到消息: " + msg);
        }
    }
}
  • Fanout Exchange:广播交换机
    RabbitMQ系列之---初识RabbitMQ-LMLPHP
    不处理路由键,只需要简单的将队列绑定到交换机;
    发送到交换机的消息都会被转发到与该交换机绑定的所有队列上;
    Fanout交换机转发消息是最快的。
public class Producer4FanoutExchange {
    public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
        ...
        Channel channel = connection.createChannel();

        String exchangeName = "test_fanout_exchange";
        String routingKey = "";

        String msg = "Hello World RabbitMQ 4 Fanout Exchange Message...";

        channel.basicPublish(exchangeName, routingKey, null, msg.getBytes());
        ...
    }
}
public class Consumer4FanoutExchange {

    public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
        ...
        Channel channel = connection.createChannel();

        String exchangeName = "test_fanout_exchange";
        String exchangeType = "fanout";
        String queueName = "test_fanout_queue";
        String routingKey = "";

        channel.exchangeDeclare(exchangeName, exchangeType, true, false, null);
        channel.queueDeclare(queueName, false, false, false, null);
        channel.queueBind(queueName, exchangeName, routingKey);

        QueueingConsumer queueingConsumer = new QueueingConsumer(channel);

        channel.basicConsume(queueName, true, queueingConsumer);

        while (true) {
            QueueingConsumer.Delivery delivery = queueingConsumer.nextDelivery();
            String msg = new String(delivery.getBody());
            System.out.println("收到消息: " + msg);
        }
    }
}

参考资料:

  1. 石杉大神消息中间件系列文章
  2. 慕课网《RabbitMQ消息中间件技术精讲》
  3. 书《RabbitMQ实战-高效部署分布式消息队列》
05-09 23:07