前面的学习都是基于原生的api,下面我们使用spingboot来整合rabbitmq

springboot对rabbitmq提供了友好支持,极大的简化了开发流程

引入maven



<
dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> </dependency>

配置yml


rabbitmq:
    host: 47.102.103.232
    port: 5672
    username: admin
    password: admin
    virtual-host: /test
    publisher-confirms: true
    publisher-returns: true
    cache:
      channel:
        size: 10
    listener:
      simple:
        acknowledge-mode: manual
        concurrency: 1
        max-concurrency: 3
        retry:
          enabled: true

这是基础的配置,看不懂的配置后面会介绍

更详细的配置参考官方https://docs.spring.io/spring-boot/docs/2.1.3.RELEASE/reference/htmlsingle/#boot-features-rabbitmq(搜索rabbit往下拉即可)

代码实现


 配置类

@Configuration
public class RabbitConfig {
    @Bean
    public Queue helloQueue() {
        return new Queue("helloQueue");
    }
   //创建topic交换机 @Bean
public TopicExchange helloExchange() { return new TopicExchange("helloExchange"); } @Bean public Binding bindingPaymentExchange(Queue helloQueue, TopicExchange helloExchange) { return BindingBuilder.bind(helloQueue).to(helloExchange).with("hello.#"); } /** * 定制化amqp模版
   * connectionFactory:包含了yml文件配置参数
*/ @Bean public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) { RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory); // 必须设置为 true,不然当 发送到交换器成功,但是没有匹配的队列,不会触发 ReturnCallback 回调 // 而且 ReturnCallback 比 ConfirmCallback 先回调,意思就是 ReturnCallback 执行完了才会执行 ConfirmCallback rabbitTemplate.setMandatory(true); // 设置 ConfirmCallback 回调 yml需要配置 publisher-confirms: true rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {        // 如果发送到交换器都没有成功(比如说删除了交换器),ack 返回值为 false // 如果发送到交换器成功,但是没有匹配的队列(比如说取消了绑定),ack 返回值为还是 true (这是一个坑,需要注意) if (ack) { String messageId = correlationData.getId(); System.out.println("confirm:"+messageId); } }); // 设置 ReturnCallback 回调 yml需要配置 publisher-returns: true // 如果发送到交换器成功,但是没有匹配的队列,就会触发这个回调 rabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> { String messageId = message.getMessageProperties().getMessageId(); System.out.println("return:"+messageId); }); return rabbitTemplate; } }
回调机制
  • 消息不管是否投递到交换机都进行ConfirmCallback回调,投递成功ack=true,否则为false
  • 交换机匹配到队列成功则不进行ReturnCallback回调,否则先进行ReturnCallback回调再进行ConfirmCallback回调
  • 如果消息成功投递到交换机,但没匹配到队列,则ConfirmCallback回调ack仍为true

生产者

@Component
public class RbProducer {
    //注意一定要使用RabbitTemplate!!
    //虽然RabbitTemplate实现了AmqpTemplate 但是AmqpTemplate里并没有能发送correlationData的方法
    @Resource
    private RabbitTemplate rbtemplate;
    public void send1(String msg){
        //CorrelationData用于confirm机制里的回调确认
        CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
        rbtemplate.convertAndSend("helloExchange", "hello.yj", msg,correlationData);
    }
    public void send2(User user){
        CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
        rbtemplate.convertAndSend("helloExchange", "hello.yj", user,correlationData);
    }
}

消费者

@Component
@RabbitListener(queues = "helloQueue")
public class RbConsumer {
    @RabbitLister(queues = "helloQueue")
    public void receive0(Message msg,  Channel channel) throws IOException {
        System.out.println("consumer receive message0: " + msg);
        channel.basicAck(msg.getMessageProperties().getDeliveryTag(), false);
    }
    @RabbitHandler
    public void receive1(String msg, @Header(AmqpHeaders.DELIVERY_TAG)long deliveryTag, Channel channel) throws IOException {
        System.out.println("consumer receive message1: " + msg);
        channel.basicAck(deliveryTag, false);
    }
    @RabbitHandler
    public void receive2(User user, @Header(AmqpHeaders.DELIVERY_TAG)long deliveryTag, Channel channel) throws IOException {
        System.out.println("consumer receive message2: "+user);
     //如果发生以下情况投递消息所有的通道或连接被突然关闭(包括消费者端丢失TCP连接、消费者应用程序(进程)挂掉、通道级别的协议异常)任何已经投递的消息但是没有被消费者端确认的消息会自动重新排队。
        //请注意,连接检测不可用客户端需要一段时间才会发现,所以会有一段时间内的所有消息会重新投递
        //因为消息的可能重新投递,所有必须保证消费者端的接口的幂等。

        //在RabbitMQ中影响吞吐量最大的参数是:消息确认模式和Qos预取值
        //自动消息确认模式或设置Qos预取值为无限虽然可以最大的提高消息的投递速度,但是在消费者端未及时处理的消息的数量也将增加,从而增加消费者RAM消耗,使用消费者端奔溃。所以以上两种情况需要谨慎使用。
        //RabbitMQ官方推荐Qos预取值设置在 100到300范围内的值通常提供最佳的吞吐量,并且不会有使消费者奔溃的问题
        channel.basicAck(deliveryTag, false);
        channel.basicQos(100);
        // 代表消费者拒绝一条或者多条消息,第二个参数表示一次是否拒绝多条消息,第三个参数表示是否把当前消息重新入队
        // channel.basicNack(deliveryTag, false, false);
        // 代表消费者拒绝当前消息,第二个参数表示是否把当前消息重新入队
        // channel.basicReject(deliveryTag,false);
    }
}

@RabbitListener+@RabbitHandler:消费者监听

  使用@RabbitListener+@RabbitHandler组合进行监听,监听器会根据队列发来的消息类型自动选择处理方法

channel.basicAck(deliveryTag, false):手动确认机制

  deliverTag:该消息的标识,每来一个消息该标识+1

  multiple:第二个参数标识书否批量确认

  requeue:被拒绝的是否重新入队

channel.basicQos(100):最多未确认的消息数量为100,超过100队列将停止给该消费者投递消息

更多参数详解参考https://www.cnblogs.com/piaolingzxh/p/5448927.html

测试



@RunWith(SpringRunner.
class) @SpringBootTest(classes = TestBoot.class) public class TestRabbit { @Resource private RbProducer producer; @Test public void send1() { producer.send1("hello,im a string"); } @Test public void send2() { User user = new User(); user.setNickname("hello,im a object"); producer.send2(user); } }

 成功消费

rabbitmq学习(七) —— springboot下的可靠使用-LMLPHP

rabbitmq学习(七) —— springboot下的可靠使用-LMLPHP

完结

下篇博客我们讨论下在拥有了手动ack机制、confirm机制、return机制后,是否真的可靠~

03-14 15:46