消费者拉取消息(Pull)示例

消费者使用Pull方式拉取消息的流程和Push消息的流程基本类似,包括创建消费者对象、设置组名、启动消费者消费。代码如下:

package com.wjw;

import org.apache.rocketmq.client.consumer.DefaultMQPullConsumer;
import org.apache.rocketmq.client.consumer.PullResult;
import org.apache.rocketmq.common.message.MessageQueue;

import java.util.HashMap;
import java.util.Map;
import java.util.Set;

public class PullConsumer {
    // 存储队列offset
    private static final Map<MessageQueue, Long> OFFSET_TABLE = new HashMap<>();

    public static void main(String[] args) throws Exception{
        DefaultMQPullConsumer consumer = new DefaultMQPullConsumer("group A");
        // 启动消费者
        consumer.start();

        Set<MessageQueue> mqs = consumer.fetchSubscribeMessageQueues("Target Topic");
        for (MessageQueue mq : mqs) {
            System.out.println("Consume message from " + mq);
            // 拉取消息
            PullResult pullResult = consumer.pullBlockIfNotFound(mq, null, OFFSET_TABLE.get(mq), 32);
            System.out.println("pullResult : " + pullResult);
            // 设置该MQ的offset
            OFFSET_TABLE.put(mq, pullResult.getNextBeginOffset());
        }
        consumer.shutdown();
    }
}

将上面的流程概括一下:

  1. 创建Pull模式的消费者对象
  2. 启动消费者消费
  3. 调用fetchSubscribeMessageQueues方法,根据Topic名称查询对应的MQ,主动拉取消息
  4. 循环遍历MQ,对于遍历到的每个MQ,取出一条消息

fetchSubscribeMessageQueues

获取所有MQ的方法源码如下,该方法位于org/apache/rocketmq/client/impl/MQAdminImpl.java中:

public Set<MessageQueue> fetchSubscribeMessageQueues(String topic) throws MQClientException {
        try {
        	// 从注册中心获取路由信息
            TopicRouteData topicRouteData = this.mQClientFactory.getMQClientAPIImpl().getTopicRouteInfoFromNameServer(topic, timeoutMillis);
            // 如果路由信息不为空则获取路由信息中的队列集合
            if (topicRouteData != null) {
                Set<MessageQueue> mqList = MQClientInstance.topicRouteData2TopicSubscribeInfo(topic, topicRouteData);
                if (!mqList.isEmpty()) {
                    return mqList;
                } else {
                    throw new MQClientException("Can not find Message Queue for this topic, " + topic + " Namesrv return empty", null);
                }
            }
        } catch (Exception e) {
            throw new MQClientException(
                "Can not find Message Queue for this topic, " + topic + FAQUrl.suggestTodo(FAQUrl.MQLIST_NOT_EXIST),
                e);
        }

        throw new MQClientException("Unknow why, Can not find Message Queue for this topic, " + topic, null);
    }

上述代码首先从注册中心中获取TopicRouteData,其中存储了路由信息:

RocketMQ 消费者拉取消息(Pull) 解析——图解、源码级解析-LMLPHP

  • orderTopicConf:顺序消息配置
  • queueDatas:队列数据数组
  • brokerAddr:Broker数据数组
  • filterServerTable:Broker地址和Filter Server之间的映射

如果拿到的TopicRouteData不为空,则提取TopicRouteData内的QueueData生成MQ,这个MQ就是当前订阅的Topic下的。如果队列集合不为空,就会直接返回。


拉取消息的核心代码

拉取消息的核心方法是pullSyncImpl,在这个方法里实现了消息的拉取

private PullResult pullSyncImpl(MessageQueue mq, SubscriptionData subscriptionData, long offset, int maxNums, boolean block,
        long timeout)
        throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
        this.isRunning();

        if (null == mq) {
            throw new MQClientException("mq is null", null);
        }

        if (offset < 0) {
            throw new MQClientException("offset < 0", null);
        }

        if (maxNums <= 0) {
            throw new MQClientException("maxNums <= 0", null);
        }

        this.subscriptionAutomatically(mq.getTopic());

        int sysFlag = PullSysFlag.buildSysFlag(false, block, true, false);

        long timeoutMillis = block ? this.defaultMQPullConsumer.getConsumerTimeoutMillisWhenSuspend() : timeout;

        boolean isTagType = ExpressionType.isTagType(subscriptionData.getExpressionType());
        // 拉取消息
        PullResult pullResult = this.pullAPIWrapper.pullKernelImpl(
            mq,
            subscriptionData.getSubString(),
            subscriptionData.getExpressionType(),
            isTagType ? 0L : subscriptionData.getSubVersion(),
            offset,
            maxNums,
            sysFlag,
            0,
            this.defaultMQPullConsumer.getBrokerSuspendMaxTimeMillis(),
            timeoutMillis,
            CommunicationMode.SYNC,
            null
        );
        
        // 对消息数据进行处理
        this.pullAPIWrapper.processPullResult(mq, pullResult, subscriptionData);
        // 如果namespace不是空的,则重置没有命名空间的Topic。
        this.resetTopic(pullResult.getMsgFoundList());
        
        // 把消息数据设置到上下文对象ConsumeMessageContext里
        if (!this.consumeMessageHookList.isEmpty()) {
            ConsumeMessageContext consumeMessageContext = null;
            consumeMessageContext = new ConsumeMessageContext();
            consumeMessageContext.setNamespace(defaultMQPullConsumer.getNamespace());
            consumeMessageContext.setConsumerGroup(this.groupName());
            consumeMessageContext.setMq(mq);
            consumeMessageContext.setMsgList(pullResult.getMsgFoundList());
            consumeMessageContext.setSuccess(false);
            this.executeHookBefore(consumeMessageContext);
            consumeMessageContext.setStatus(ConsumeConcurrentlyStatus.CONSUME_SUCCESS.toString());
            consumeMessageContext.setSuccess(true);
            this.executeHookAfter(consumeMessageContext);
        }
        return pullResult;
    }
10-01 07:29