本文图片和部分总结来自于参考资料,半原创,侵删

问题

  • Rocketmq 重试是否有超时问题,假如超时了如何解决,是重新发送消息呢?还是一直等待
  • 假如某个 msg 进入了重试队列(%RETRY_XXX%),然后成功消费了

概述

    文章介绍了RocketMQ 的重试机制和消息重试的机制。

定时任务

定时任务概述

    rocketmq为定时任务创建一个单独的 topic ,而 rocketmq的定时任务是定的时间是分等级的,而不同等级对应topic内不同的队列,然后通过一个“执行定时任务的服务”定时执行多个队列内的任务,执行时需要更改该定时任务实际要发送的 topic 和 tag 。

发送例子

发送例子

Message msg =
    new Message("TopicTest" /* Topic */,
                "TagA" /* Tag */,
                ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */);
msg.setDelayTimeLevel(i + 1);

    时间等级

public class MessageStoreConfig {

    private String messageDelayLevel = "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h";
    
}

### 写入定时任务
    写入的时候是在写入commitLog 的时候写入的,这一点很重要,因为这也是实现消费失败重试的基础。 CommitLog 会将这条消息的话题和队列 ID 替换成专门用于定时的话题和相应的级别对应的队列 ID。真实的话题和队列 ID 会作为属性放置到这条消息中,后面处理的时候会自己从这个队列id 进行发送消息。

public class CommitLog {

public PutMessageResult putMessage(final MessageExtBrokerInner msg) {

    // Delay Delivery
    if (msg.getDelayTimeLevel() > 0) {

        topic = ScheduleMessageService.SCHEDULE_TOPIC;
        queueId = ScheduleMessageService.delayLevel2QueueId(msg.getDelayTimeLevel());

        // Backup real topic, queueId
        MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_TOPIC, msg.getTopic());
        MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_QUEUE_ID, String.valueOf(msg.getQueueId()));
        msg.setPropertiesString(MessageDecoder.messageProperties2String(msg.getProperties()));

        // 替换 Topic 和 QueueID
        msg.setTopic(topic);
        msg.setQueueId(queueId);
    }
    
}

}



### 处理定时任务

    执行定时任务的服务,ScheduleMessageService 的 start 方法

    public void start() {

        for (Map.Entry<Integer, Long> entry : this.delayLevelTable.entrySet()) {
            Integer level = entry.getKey();
            Long timeDelay = entry.getValue();
            Long offset = this.offsetTable.get(level);
            if (null == offset) {
                offset = 0L;
            }

            if (timeDelay != null) {
                //Timer 持有多个定时任务,然后时间到了就执行该任务,
                // 但是 Timer 内部只有一个线程在执行任务,也就不能保证时间的正确性(因为当一个线程在执行的时候,某个任务的时间已经到了)
                // 注意,是为每个延时时间等级建一个任务Task
                this.timer.schedule(new DeliverDelayedMessageTimerTask(level, offset), FIRST_DELAY_TIME);
            }
        }

        this.timer.scheduleAtFixedRate(new TimerTask() {

            @Override
            public void run() {
                try {
                    ScheduleMessageService.this.persist();
                } catch (Throwable e) {
                    log.error("scheduleAtFixedRate flush exception", e);
                }
            }
        }, 10000, this.defaultMessageStore.getMessageStoreConfig().getFlushDelayOffsetInterval());
    }

class DeliverDelayedMessageTimerTask extends TimerTask {

    public void executeOnTimeup() {
        // ...
        for (; i < bufferCQ.getSize(); i += ConsumeQueue.CQ_STORE_UNIT_SIZE) {
            // 是否到时间
            long countdown = deliverTimestamp - now;

            if (countdown <= 0) {
                // 取出消息
                MessageExt msgExt =
                    ScheduleMessageService.this.defaultMessageStore.lookMessageByOffset(offsetPy, sizePy);
                // 修正消息,设置上正确的话题和队列 ID
                MessageExtBrokerInner msgInner = this.messageTimeup(msgExt);
                // 重新存储消息
                PutMessageResult putMessageResult =
                    ScheduleMessageService.this.defaultMessageStore
                    .putMessage(msgInner);
            } else {
                // countdown 后投递此消息
                ScheduleMessageService.this
                    .timer
                    .schedule(new DeliverDelayedMessageTimerTask(this.delayLevel, nextOffset), countdown);
                // 更新偏移量
            }
        } // end of for

        // 更新偏移量
    }
    
}

    同时该定时任务也进行持久化,一个是消费进度,一个消息对应的位移量

消息消费重试

    RocketMQ中遇到以下情况就会进行消息重试 :

  • 抛出异常
  • 返回 NULL 状态
  • 返回 RECONSUME_LATER 状态
  • 超时 15 分钟没有响应

consumer 注册订阅重试队列

    consumer 在启动的时候就会订阅“%RETRY_XXX%”的topic,为的就是当某个topic消费失败处理重试消息。如下图所示 :

public class DefaultMQPushConsumerImpl implements MQConsumerInner {

    public synchronized void start() throws MQClientException {
        switch (this.serviceState) {
        case CREATE_JUST:
            // ...
            this.copySubscription();
            // ...
        
            this.serviceState = ServiceState.RUNNING;
            break;
        }
    }

    private void copySubscription() throws MQClientException {
        switch (this.defaultMQPushConsumer.getMessageModel()) {
        case BROADCASTING:
            break;
            
        case CLUSTERING:
            // 重试话题组
            final String retryTopic = MixAll.getRetryTopic(this.defaultMQPushConsumer.getConsumerGroup());
            SubscriptionData subscriptionData = FilterAPI.buildSubscriptionData(this.defaultMQPushConsumer.getConsumerGroup(),
                                                                                retryTopic, SubscriptionData.SUB_ALL);
            this.rebalanceImpl.getSubscriptionInner().put(retryTopic, subscriptionData);
            break;
            
        default:
            break;
        }
    }
    
}

超时消费

    我们思考一个问题,假如消费者掉线了,那么消息直接发不过去了,而要是消费者的消费逻辑执行了太久的业务逻辑,那么应该有一个动作来触发消费超时,进行重试.

ConsumeMessageConcurrentlyService 的 start 方法。

    public void start() {
        this.cleanExpireMsgExecutors.scheduleAtFixedRate(new Runnable() {

            @Override
            public void run() {
                cleanExpireMsg();
            }

        }, this.defaultMQPushConsumer.getConsumeTimeout(), this.defaultMQPushConsumer.getConsumeTimeout(), TimeUnit.MINUTES);
    }

这个定时周期任务每过 getConsumeTimeout 时间就会扫描消费超时的任务,调用 sendMessageBack 方法,该方法会调用 RPC发送消息给 broker ,消费失败进行重试。

    上一篇我们讲到消息消费的过程,当集群模式下,消息消费成功会本地的消息消费进度,而失败了会调用RPC 发送消息给broker ,而broker 处理的逻辑在 SendMessageProcessor

    @Override
    public RemotingCommand processRequest(ChannelHandlerContext ctx,
        RemotingCommand request) throws RemotingCommandException {
        SendMessageContext mqtraceContext;
        switch (request.getCode()) {

            //消费者消费失败的情况
            case RequestCode.CONSUMER_SEND_MSG_BACK:
                return this.consumerSendMsgBack(ctx, request);
            default:

                SendMessageRequestHeader requestHeader = parseRequestHeader(request);
                if (requestHeader == null) {
                    return null;
                }

                mqtraceContext = buildMsgContext(ctx, requestHeader);
                this.executeSendMessageHookBefore(ctx, request, mqtraceContext);

                RemotingCommand response;
                if (requestHeader.isBatch()) {
                    response = this.sendBatchMessage(ctx, request, mqtraceContext, requestHeader);
                } else {
                    response = this.sendMessage(ctx, request, mqtraceContext, requestHeader);
                }

                this.executeSendMessageHookAfter(response, mqtraceContext);
                return response;
        }
    }

批量处理的问题

    批量处理一批数据要是返回 RECONSUME_LATER ,那么这批数据就会重新发给 broker ,进行消息重试,所以在业务逻辑的时候就要考虑消费者重新消费的幂等性。

    ConsumeRequest的 run 方法

        @Override
        public void run() {
            ....

            try {
                ConsumeMessageConcurrentlyService.this.resetRetryTopic(msgs);
                if (msgs != null && !msgs.isEmpty()) {
                    for (MessageExt msg : msgs) {
                        MessageAccessor.setConsumeStartTimeStamp(msg, String.valueOf(System.currentTimeMillis()));
                    }
                }
                //NO.1 业务实现
                status = listener.consumeMessage(Collections.unmodifiableList(msgs), context);
            } catch (Throwable e) {
                log.warn("consumeMessage exception: {} Group: {} Msgs: {} MQ: {}",
                        RemotingHelper.exceptionSimpleDesc(e),
                        ConsumeMessageConcurrentlyService.this.consumerGroup,
                        msgs,
                        messageQueue);
                hasException = true;
            }

            ...

            if (!processQueue.isDropped()) {
                //NO.2 处理消息消费的结果
                ConsumeMessageConcurrentlyService.this.processConsumeResult(status, context, this);
            } else {
                log.warn("processQueue is dropped without process consume result. messageQueue={}, msgs={}", messageQueue, msgs);
            }
        }

ack 机制

    public void processConsumeResult(
            final ConsumeConcurrentlyStatus status,
            final ConsumeConcurrentlyContext context,
            final ConsumeRequest consumeRequest
    ) {
        int ackIndex = context.getAckIndex();

        if (consumeRequest.getMsgs().isEmpty())
            return;

        switch (status) {
            case CONSUME_SUCCESS:
                if (ackIndex >= consumeRequest.getMsgs().size()) {
                    ackIndex = consumeRequest.getMsgs().size() - 1;
                }
                int ok = ackIndex + 1;
                int failed = consumeRequest.getMsgs().size() - ok;
                this.getConsumerStatsManager().incConsumeOKTPS(consumerGroup, consumeRequest.getMessageQueue().getTopic(), ok);
                this.getConsumerStatsManager().incConsumeFailedTPS(consumerGroup, consumeRequest.getMessageQueue().getTopic(), failed);
                break;
            case RECONSUME_LATER:
                ackIndex = -1;
                this.getConsumerStatsManager().incConsumeFailedTPS(consumerGroup, consumeRequest.getMessageQueue().getTopic(),
                        consumeRequest.getMsgs().size());
                break;
            default:
                break;
        }

        switch (this.defaultMQPushConsumer.getMessageModel()) {
            case BROADCASTING:
                for (int i = ackIndex + 1; i < consumeRequest.getMsgs().size(); i++) {
                    MessageExt msg = consumeRequest.getMsgs().get(i);
                    log.warn("BROADCASTING, the message consume failed, drop it, {}", msg.toString());
                }
                break;
            case CLUSTERING:
                //发送给broker , 该批数据进行消息重试
                List<MessageExt> msgBackFailed = new ArrayList<MessageExt>(consumeRequest.getMsgs().size());
                for (int i = ackIndex + 1; i < consumeRequest.getMsgs().size(); i++) {
                    MessageExt msg = consumeRequest.getMsgs().get(i);
                    boolean result = this.sendMessageBack(msg, context);
                    if (!result) {
                        msg.setReconsumeTimes(msg.getReconsumeTimes() + 1);
                        msgBackFailed.add(msg);
                    }
                }

                if (!msgBackFailed.isEmpty()) {
                    consumeRequest.getMsgs().removeAll(msgBackFailed);

                    this.submitConsumeRequestLater(msgBackFailed, consumeRequest.getProcessQueue(), consumeRequest.getMessageQueue());
                }
                break;
            default:
                break;
        }

        //删除消息树中的已消费的消息节点,并返回消息树中最小的节点,更新最小的节点为当前进度!!
        long offset = consumeRequest.getProcessQueue().removeMessage(consumeRequest.getMsgs());
        if (offset >= 0 && !consumeRequest.getProcessQueue().isDropped()) {
            this.defaultMQPushConsumerImpl.getOffsetStore().updateOffset(consumeRequest.getMessageQueue(), offset, true);
        }
    }

可以看到消息消费完成后,更新的进度都是对应的 processqueue中对应的消息树里的最小节点(即偏移量最小的节点),那么有可能存在这样的问题,下面来自 参考

这钟方式和传统的一条message单独ack的方式有本质的区别。性能上提升的同时,会带来一个潜在的重复问题——由于消费进度只是记录了一个下标,就可能出现拉取了100条消息如 2101-2200的消息,后面99条都消费结束了,只有2101消费一直没有结束的情况。

在这种情况下,RocketMQ为了保证消息肯定被消费成功,消费进度职能维持在2101,直到2101也消费结束了,本地的消费进度才能标记2200消费结束了(注:consumerOffset=2201)。

在这种设计下,就有消费大量重复的风险。如2101在还没有消费完成的时候消费实例突然退出(机器断电,或者被kill)。这条queue的消费进度还是维持在2101,当queue重新分配给新的实例的时候,新的实例从broker上拿到的消费进度还是维持在2101,这时候就会又从2101开始消费,2102-2200这批消息实际上已经被消费过还是会投递一次。

总结

    从参考资料中我学习到了自己学习与别人的差异是总结的能力,通过浓缩代码片段,总结核心的逻辑步骤,加深对逻辑的理解。

参考资料

  • https://www.jianshu.com/p/5843cdcd02aa
  • http://jaskey.github.io/blog/2017/01/25/rocketmq-consume-offset-management/
12-25 06:33