本文主要研究一下rocketmq producer的batch

batch

rocketmq-client-4.6.0-sources.jar!/org/apache/rocketmq/client/producer/DefaultMQProducer.java

public class DefaultMQProducer extends ClientConfig implements MQProducer {

    //......

    private MessageBatch batch(Collection<Message> msgs) throws MQClientException {
        MessageBatch msgBatch;
        try {
            msgBatch = MessageBatch.generateFromList(msgs);
            for (Message message : msgBatch) {
                Validators.checkMessage(message, this);
                MessageClientIDSetter.setUniqID(message);
                message.setTopic(withNamespace(message.getTopic()));
            }
            msgBatch.setBody(msgBatch.encode());
        } catch (Exception e) {
            throw new MQClientException("Failed to initiate the MessageBatch", e);
        }
        msgBatch.setTopic(withNamespace(msgBatch.getTopic()));
        return msgBatch;
    }

    //......
}
  • DefaultMQProducer的batch方法接收Message集合,它会使用MessageBatch.generateFromList创建MessageBatch,之后遍历MessageBatch校验message,设置唯一id,更新topic以及body;最后返回MessageBatch

MessageBatch

rocketmq-common-4.6.0-sources.jar!/org/apache/rocketmq/common/message/MessageBatch.java

public class MessageBatch extends Message implements Iterable<Message> {

    private static final long serialVersionUID = 621335151046335557L;
    private final List<Message> messages;

    private MessageBatch(List<Message> messages) {
        this.messages = messages;
    }

    public byte[] encode() {
        return MessageDecoder.encodeMessages(messages);
    }

    public Iterator<Message> iterator() {
        return messages.iterator();
    }

    public static MessageBatch generateFromList(Collection<Message> messages) {
        assert messages != null;
        assert messages.size() > 0;
        List<Message> messageList = new ArrayList<Message>(messages.size());
        Message first = null;
        for (Message message : messages) {
            if (message.getDelayTimeLevel() > 0) {
                throw new UnsupportedOperationException("TimeDelayLevel is not supported for batching");
            }
            if (message.getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
                throw new UnsupportedOperationException("Retry Group is not supported for batching");
            }
            if (first == null) {
                first = message;
            } else {
                if (!first.getTopic().equals(message.getTopic())) {
                    throw new UnsupportedOperationException("The topic of the messages in one batch should be the same");
                }
                if (first.isWaitStoreMsgOK() != message.isWaitStoreMsgOK()) {
                    throw new UnsupportedOperationException("The waitStoreMsgOK of the messages in one batch should the same");
                }
            }
            messageList.add(message);
        }
        MessageBatch messageBatch = new MessageBatch(messageList);

        messageBatch.setTopic(first.getTopic());
        messageBatch.setWaitStoreMsgOK(first.isWaitStoreMsgOK());
        return messageBatch;
    }

}
  • MessageBatch继承了Message,实现了Iterable<Message>接口;其generateFromList方法会遍历messages,最后组装成messageBatch

小结

DefaultMQProducer的batch方法接收Message集合,它会使用MessageBatch.generateFromList创建MessageBatch,之后遍历MessageBatch校验message,设置唯一id,更新topic以及body;最后返回MessageBatch

doc

03-05 18:08