接着上一讲 消息中间件之RabbitMQ初识,这笔我们来讲讲RabbitMQ中消息丢失的问题。已经怎样在核心业务中避免消息丢失。

血泪故事:商品购物流程中的发货环节引入了RabbitMQ,某天由于网络抖动导致了生产者的消息没有发送到RabbitMQ中,由于没有做消息的可靠性传输保证,消息丢失,导致一批客户迟迟没收到货物而引发投诉,给公司造成了不小的损失。

为了避免上述悲剧重演,我们来了解下在RabbitMQ中我们需要怎样保证消息不丢失。

消息丢失会发生在什么时候

消息的传输过程大致如下图

RabbitMQ消息可靠性传输-LMLPHP

消息丢失可能发生在

  • Producer端 发送到RabbitMQ中由于网络异常或者服务异常导致消息发送失败。
  • RabbitMQ服务端 异常或者重启导致消息丢失。
  • Consumer端 接收到消息后,消息处理失败,消息丢失。

当然上一讲中有提到在RabbitMQ,生产者发送消息是和Exchange交互,Exchange根据路由规则投递到具体的Queue中,如果路由规则设置有问题,也会导致消息丢失,但此条不在本文讨论重点。

Producer 消息可靠性保证

为了避免由于网络抖动或者RabbitMQ服务端异常导致消息发送失败的问题。可以在Producer发送消息的使用引入了一个确认机制(ack),服务端接收到消息之后,会返回给Producer一个成功或者失败的确认消息。

RabbitMQ提供了两种解决方式:

  • 事务机制
  • 发送方确认机制

事务方式,主要方法有以下几个

  • channel.txSelect() 将当前的channel设置成事务模式。
  • channel.txCommit()用于提交事务。
  • channel.txRollback()用于事务回滚

下面代码是简单示例

try {
channel.txSelect();
channel.basicPublish(exchange, routingKey, MessageProperties.PERSISTENT_TEXT_PLAIN, msg.getBytes());
channel.txCommit();
} catch (Exception e) {
e.printStackTrace();
channel.txRollback();
//发送失败后续处理,重发或者持久化异常消息稍后重试
}

信号的流转过程如下图

RabbitMQ消息可靠性传输-LMLPHP

图片来源 RabbitMQ实战指南

如果事务能够提交成功,则消息一定到达了RabbitMQ中。

RabbitMQ消息可靠性传输-LMLPHP

图片来源 RabbitMQ实战指南

事务机制能够解决消息生产者和RabbitMQ之间消息 确认的问题,只有消息成功被RabbitMQ接收,事务才能提交成功。但事务机制是同步阻塞进行的,回大大降低RabbitMQ的吞吐量,RabbitMQ提供了一种改进方案,即发送方确认机制。

发送方确认机制:

  • channel.confirmSelect(); 将通道设置确认机制
  • channel.addConfirmListener() 为通道添加ConfirmListener这个回调接口。
  • com.rabbitmq.client.ConfirmListener#handleAck 回调处理正常被RabbitMQ接收的消息。
  • com.rabbitmq.client.ConfirmListener#handleNack回调处理没有被RabbitMQ正常接收的消息。
SortedSet<Long> confirmSet = Collections.synchronizedSortedSet(new TreeSet<Long>());
channel.confirmSelect();
channel.addConfirmListener(new ConfirmListener() {
	public void handleAck(long deliveryTag, boolean multiple) throws IOException {
		if (multiple) {
			confirmSet.headSet(deliveryTag + 1).clear();
		} else {
			confirmSet.remove(deliveryTag);
		}
	}
	public void handleNack(long deliveryTag, boolean multiple) throws IOException {
		System.out.println("Nack, SeqNo: " + deliveryTag + ", multiple: " + multiple);
		if (multiple) {
			confirmSet.headSet(deliveryTag + 1).clear();
		} else {
			confirmSet.remove(deliveryTag);
		}
		//这里需要添加消息发送失败处理的代码,重新发送或者持久化后补偿。
	}
});
//模拟一直发送消息的场景
while (true) {
	long nextSeqNo = channel.getNextPublishSeqNo();
	channel.basicPublish(ConfirmConfig.exchangeName, ConfirmConfig.routingKey, MessageProperties.PERSISTENT_TEXT_PLAIN, ConfirmConfig.msg_10B.getBytes());
	confirmSet.add(nextSeqNo);
}

上面例子演示了异步confirm的形式,在保证生产者消息被RabbitMQ正常接收,又没有同步阻塞导致明显降低RabbitMQ吞吐量的问题。

RabbitMQ端

为避免RabbitMQ服务异常或者重启导致的消息丢失,需要对做持久化操作,将相关信息保存到磁盘上。要保证消息不丢失需要持久化主队列、持久化。exchange不持久化,在RabbitMQ服务重启后,相关的exchange元数据会丢失,不过消息不丢失,但消息不能发送到这个exchange中了。

  • 队列持久化需要在声明队列的时候将durable参数设置为true。(因为消息是存在与队列中,如果队列不持久化,那RabbitMQ重启后,消息将丢失)
  • 消息持久化通过将投递模式设置成2(BasicProperties中的deliveryMode)。
channel.queueDeclare(QUEUE_NAME,true,//durable
                     false,false,null);
channel.basicPublish("",QUEUE_NAME,
                     MessageProperties.PERSISTENT_TEXT_PLAIN,//具体属性见下面
                     message.getBytes(StandardCharsets.UTF_8));
public static final BasicProperties PERSISTENT_TEXT_PLAIN =
new BasicProperties("text/plain",
					null,
					null,
					2, //deliveryMode
					0, null, null, null,
					null, null, null, null, null, null);

Consumer端

为保证Consumer端不因消费处理异常或消费者应用重启导致消息丢失。我们需要如下操作

  • 关闭默认的自动确认。设置为手动确认模式。

当设置为手动确认模式,对于RabbitMQ服务端而言队列中的消息分为了两种

  • Ready:等待投递给消费者的消息。
  • Unacked:已经投递给消费者,但还没有收到消费者确认新号的消息。

对于Unacked消息,会出现下面几种情况:

  • RabbitMQ收到持有消息的消费者的ack信号,RabbitMQ服务端将会删除该消息。
  • RabbitMQ服务端收到持有消息的消费者nack/reject信号,requeue参数为true,RabbitMQ会重新将这条消息存入队列。
  • RabbitMQ服务端收到持有消息的消费者nack/reject信号,requeue参数为false,如果队列配置了死信队列,则消息进入死信队列,如果没有配置死信队列,则消息被RabbitMQ从队列中删除。
  • RabbitMQ服务端没有收到消息持有消费者的确认信号,且消费此消息的消费者没有断开连接,则服务端会一直等待,没有超时时间。
  • RabbitMQ服务端没有收到消息持有消费者的确认信号,且消费此消息的消费者已经断开连接,RabbitMQ会安排该消息重新进入队列。

消息拒绝可以使用Channel类中的basicReject或者basicNack方法,下面我们来看下他们之间的差异。

void basicNack(long deliveryTag, boolean multiple, boolean requeue) throws IOException;

void basicReject(long deliveryTag, boolean requeue) throws IOException;
  • deliveryTag:64位的长整型值,作为消息的编号。
  • requeue:是否重入队列配置项。
  • multiple:是否批量处理未被当前消费者确认的消息。

我们来看一个代码示例:

boolean autoAck = false;
channel.basicConsume(queueName, autoAck, "a-consumer-tag",
     new DefaultConsumer(channel) {
         @Override
         public void handleDelivery(String consumerTag,
                                    Envelope envelope,
                                    AMQP.BasicProperties properties,
                                    byte[] body)
             throws IOException
         {
             long deliveryTag = envelope.getDeliveryTag();
			 try{
				//消息处理业务逻辑处理
				channel.basicAck(deliveryTag, false);
			 }catch(Exception e){
                 //处理失败处理逻辑
				channel.basicReject(deliveryTag, false);
			 }
         }
     });

通过手动确认模式,RabbitMQ只有在收到持有消息的Consumer的应答信号时,才会删除掉消息,保证消息不因Consumer应用异常而导致消息丢失的问题发生。

看了消费端保证消息不丢失的方案,有小伙伴会有疑问,假如RabbitMQ已经把消息投递给了Consumer,Consumer正常的处理了消息,但是由于网络抖动等原因,RabbitMQ没有收到Consumer的ack消息,且认为Consumer已经断开连接,那么RabbitMQ会重新将消息放入队列,并投递给消费者。这样会导致某些消息重复投递给Consumer的问题产生。

在此种方案下RabbitMQ确实有可能产生重复消息的问题,我们将在接下来的文章中去处理这个问题。

该方案只保证消息至少一次投递(At least Once)

死信队列

DLX,全名Dead-Letter-Exchange,死信交换器。当一个消息变为死信(dead message)后,能够被重新DLX上,绑定DLX的队列就是死信队列。

消息变成私信有以下几种可能

  • 消息被拒绝(basicNack/basicReject),并且设置requeue参数为false;
  • 消息过期。
  • 队列超过最大长度。

下面通过一个简化的代码示例来演示下死信队列的使用。详细说明见注释

//声明交换器
channe1.exchangeDeclare("exchange.dlx","direct ",true);
channe1.exchangeDeclare( "exchange.normal "," fanout ",true);
Map<String , Object> args = new HashMap<String, Object>( );
//设置消息超时时间
args.put("x-message-ttl " , 10000);
//通过x-dead-letter-exchange参数来执行DLX
args.put( "x-dead-letter-exchange ","exchange.dlx");
//为DLX指定路由键
args.put( "x-dead-letter-routing-key"," routingkey");
channe1.queueDec1are( "queue.norma1 ",true,fa1se,fa1se,args);
channe1.queueBind( "queue.normal ","exchange .normal", "");
channe1.queueDec1are( "queue.d1x ", true , false , false , null) ;
channe1.queueBind( "queue.dlx","exchange.dlx ", routingkey");
channe1.basicPublish( "exchange.normal" , "rk" ,
MessageProperties.PERSISTENT_TEXT_PLAIN,"dlx".getBytes()) ;

消息流程见下图

RabbitMQ消息可靠性传输-LMLPHP

对于RabbitMQ来说,通过分析死信队列中的消息,可以用于改善和优化系统。

总结:消息丢失可能发生在生产端、服务端、消费端。对于重要业务我们可以通过上面介绍的方式来确保消息不丢失。大家也可以留言讨论下,在使用RabbitMQ过程中遇到过哪些坑。

参考文档

  1. RabbitMQ实战指南
  2. https://www.rabbitmq.com/reliability.html#what-can-fail
06-12 21:17