前言

三此君看了好几本书,看了很多遍源码整理的 一张图进阶 RocketMQ 图片,关于 RocketMQ 你只需要记住这张图!觉得不错的话,记得点赞关注哦。
一张图进阶 RocketMQ - 消息存储-LMLPHP
【重要】视频在 B 站同步更新,欢迎围观,轻轻松松涨姿势。一张图进阶 RocketMQ-消息存储(视频版)
点击查看【bilibili】

本文是“一张图进阶 RocketMQ”第 5 篇,对 RocketMQ 不了解的同学可以先看看前面 4 期:

  1. 一张图进阶 RocketMQ-整体架构
  2. 一张图进阶 RocketMQ - NameServer
  3. 一张图进阶 RocketMQ - 消息发送
  4. 一张图进阶 RocketMQ - 通信机制

前面两期我们主要分享了 RocketMQ 是如何将消息发送出去的,现在消息已经被 Netty 送上路了,接力棒已经交给了 Broker。如果我们自己来实现 Broker 会怎么实现呢?首先肯定得把消息存起来吧,不然宕机了,消息丢失了,那就离大谱了。

可是消息要以什么结构存储呢?二进制、JSON、PB?从功能上来看肯定都是可以的,那 RocketMQ 到底是怎么搞的?

解决了存储结构问题,那消息存到哪里呢?数据库,本地文件,还是对象存储服务器?从功能的角度肯定也都是可以的。可是,哪家数据库可以支持单机十万级吞吐量?那我直接统统存到数据库得了,瞎折腾些啥。难道存在本地文件就可以了?我们自己实现不可以,但是 RocketMQ 可以,那 RocketMQ 有什么黑科技呢?

所以我们今天就来聊一聊 Broker 如何存储消息,【首先明确我们的目标】我们需要先了解 RocketMQ 的存储结构,也就是消息是如何组织的。了解了存储结构,我们才能更好的理解存储流程,不然我们不知道为什么流程是这样的。最后我们需要了解有哪些机制支撑 RocketMQ 单机十万级吞吐量。

存储架构

一张图进阶 RocketMQ - 消息存储-LMLPHP
消息在 Broker 上的存储结构如上图,所有相关文件放在 ROCKETMQ_HOME 下,有哪些文件呢?存放消息本身的 CommitLog,以及消息的索引文件 ConsumeQueue 和 IndexFile:

  • CommitLog

从物理结构上来看,所有的消息都存储在 CommitLog 里面,其实就是所有的消息按照“消息在 CommitLog 各字段示意图”所示,挨个按顺序存储到文件中。

单个 CommitLog 文件大小默认 1G ,文件名长度为 20 位,左边补零,剩余为起始偏移量。比如 00000000000000000000 代表了第一个文件,起始偏移量为 0,文件大小为 1G=1073741824;当第一个文件写满了,第二个文件为 00000000001073741824,起始偏移量为 1073741824,以此类推。消息主要是顺序写入日志文件,当文件满了,写入下一个文件。CommitLog 顺序写,可以大大提高写入效率。

但是问题来了,消息发送的时候我们指定了 Topic,现在所有 Topic 都顺序个写入到 CommitLog,存入的时候是安逸了(顺序写),但是获取消息可就麻烦了。如果我要获取某个 Topic 的消息,需要遍历 commitlog 文件,根据 topic 过滤消息。CommitLog 这个渣男,只管自己爽。有什么办法可以提高消息查询效率呢?

  • ConsumeQueue

我们再回忆一下,消息存入的时候是指定了 Topic,同时我们也说了每个 Topic 会对应多个 ConsumeQueue( queueId 标识)。关键就在 ConsumeQueue 上,ConsumeQueue 是指定 Topic 消息的索引文件,怎么理解呢?从“消息在 ConsumeQueue 各字段示意图”可知,每个条目共 20 个字节,分别为 8 字节的 commitlog 物理偏移量、4 字节的消息长度、8 字节 tag hashcode,单个文件由 30W 个条目组成,可以像数组一样随机访问每一个条目,每个 ConsumeQueue 文件大小约 5.72M。ConsumeQueue 文件可以看成是基于 topic 的 commitlog 索引文件。Consumer 即可根据 ConsumeQueue 来查找待消费的消息。

因为 ConsumeQueue 里只存偏移量信息,所以尺寸是有限的,在实际情况中,大部分的 ConsumeQueue 能够被全部读入内存,所以这个中间结构的操作速度很快,可以认为是内存读取的速度。此外为了保证 CommitLog 和 ConsumeQueue 的一致性,CommitLog 里存储了 ConsumeQueues、Message Key、Tag 等所有信息,即使 ConsumeQueue 丢失,也可以通过 CommitLog 完全恢复出来。
ConsumeQueue 文件夹的组织方式如下:topic/queue/file 三层组织结构,具体存储路径为:$HOME/store/consumequeue/{topic}/{queueId}/{fileName}。

  • IndexFile

IndexFile 是另一种可选索引文件,提供了一种可以通过 key 或时间区间来查询消息的方法。 IndexFile 索引文件其底层实现为 hash 索引,类似于 Java 1.7 HashMap,计算 Key 的 hashcode,hashcode 取余得到 hash 槽,拉链法解决哈希冲突。Index 文件的存储位置是:$HOME \store\index${fileName},文件名 fileName 是以创建时的时间戳命名的,固定的单个 IndexFile 文件大小约为 400M,一个 IndexFile 可以保存 2000W 个索引。

所以,RocketMQ 消息存储架构主要有 CommitLog,ConsumeQueue,IndexFile 构成。我们发送一条消息,会先格式化成“消息在 CommitLog 各字段示意图”中的样子,顺序写入 CommitLog 中,然后 Broker 会按照 ”消息在 ConsumeQueue 各字段示意图“所示构建一条索引记录,存入该消息所属 Topic 的 ConsumeQueue 索引文件中。如果有 IndexFile,还会构建 IndexFile。

现在我们已经知道了 RocketMQ 消息的存储结构,接下来我们的就要了解 RocketMQ 是如何构建 CommitLog、ConsumeQueue 和 IndexFile,以及 RocketMQ 如何保证性能,支撑单机十万级吞吐量的?这是本文的主要目标,一定要抓住主要目标,不要走丢咯。

启动流程

了解了 RocketMQ 消息在磁盘中是怎么存储的,我们就可以来看看具体的存储流程了。首先,还是先来看看 Broker 的启动流程。初始化过程都是这个鸟样,只看初始化过程完全不知所云,但是不看初始化过程,直接看具体执行流程也是摸不着头脑,一堆组件不知道从哪里来的,所以我们还是先耐着性子大致看看。但这并不是我们关注的重点,注意几个关键点即可。
一张图进阶 RocketMQ - 消息存储-LMLPHP

  • 初始化启动环境。部署好 RocketMQ 后,执行/bin/mqbroker 脚本,主要用于设置 RocketMQ 目录环境变量,例如 ROCKETMQ_HOME 。然后调用 ./bin/runbroker.sh 进入 RocketMQ 的启动入口,主要设置了 JVM 启动参数,比如 JAVA_HOME、Xms、Xmx。执行 main 函数。
  • 初始化 BrokerController。该初始化主要包含 RocketMQ 启动命令行参数解析、NettyRemotingServer 初始化、Broker 各个模块配置参数解析、Broker 各个模块初始化、进程关机 Hook 初始化等过程。
  • 启动 RocketMQ 的各个组件。但是这些组件并不是每一个都是核心组件,部分组件会在后面的流程中使用,这里混个眼熟,如果后面流程没有提及的大家可以暂且跳过,我们的目标是把握 RocketMQ 的核心内容,而不是每个细节。

存储流程

一张图进阶 RocketMQ - 消息存储-LMLPHP
在前面 RocketMQ 存储结构中我们了解了 RocketMQ 将所有消息顺序写入 CommitLog,然后构建 ConsumeQueue/IndexFile 索引文件,所以这个小结我们主要的目标就是看看这些文件是如何构建的。

  • Broker 启动流程中很关键的一点是启动了 NettyRemotingServer,在 RocketMQ 通信机制(视频) 中我们介绍过 Broker(NettyRemotingServer) 初始化会监听端口等待客户端连接,当客户端发送请求的时,NettyRemotingServer WorkerGroup 处理可读事件,调用 NettyServerHandler.channelRead0() 处理数据。
    接着调用链到 processRequestCommand 方法,这个方法主要是根据请求中的 RequestCode,从本地缓存 processorTable 中获取相应的 Processor 来执行后续逻辑。处理器是什么?处理器的缓存从哪里来?

  • SendMessageProcessor.processRequest 调用 sendMessage 方法,主要包含消息的校验及重试逻辑处理,然后调用存储模块 DefaultMessageStore 存储消息。
    消息校验:校验 Broker 是否配置可写,校验 Topic 名字是否为默认值,获取或创建 topicConfig,判断 queueId 是否超过限制。
    重试消息处理:消费者消费失败后会将消息发回给 Broker,这里我们暂且认为就是生产者发送的请求,先看下面的流程。

  • DefaultMessageStore.putMessage 只是做了很多的校验,简单看看即可。包括:如果当前 Broker 停止工作则拒绝消息写入、Broker 为 SLAVE 角色则拒绝消息写入、当前 RocketMQ 不支持写入则拒绝消息写入、主题长度超过 256 个字符则拒绝消息写入、消息属性长度超过 65536 个字符则拒绝消息写入、PageCache 忙则报错。然后调用 CommitLog.putMessage 存入消息。

  • 看到这里应该稍微熟悉一些了,终于到我们期待已久的 CommitLog 出场了。主要是延迟消息处理,然后获取可以写入的 CommitLog 进行写入。
    延迟消息处理:如果消息的延迟级别大于 0,将消息的原主题名称与原消息队列 ID 存入消息属性中,用延迟消息主题 SCHEDULE_TOPIC、消息队列 ID 更新原先消息的主题与队列,这是并发消息消费重试关键的一步。但不是这个本节的主要目标,后文会进一步分析。
    关键点在如何获取可以写入的 CommitLog。存储结构小节里面有提到每个 CommitLog 默认大小 1G,写完一个文件,以偏移量命名创建下一个文件。每个 1G 大小 CommitLog 的在代码层面对应的是 MappedFile,而多个 MappedFiled 组成 MappedFileQueue。逻辑上的 CommitLog 通过持有 MappedFileQueue 管理多个 MappedFile。所以,获取可以写入的 CommitLog 也就是获取 MappedFileQueue 最后一个 MappedFile,为什么是最后一个,因为前面的已经写完了呀。来看看 RocketMQ 逻辑与物理存储的对应关系应该能够更直观的理解。一张图进阶 RocketMQ - 消息存储-LMLPHP

  • 获取到最后一个 MappedFile 后,调用 MappedFile.appendMessage 将消息追加到该文件中。可是尽管是顺序写入,但是连小学生都知道写磁盘还是很慢,难道想这样支撑 RocketMQ 单机十万吞吐量?too young too simple!从逻辑存储结构和物理存储结构的映射关系来看,MappedFile 持有物理 CommitLog 的 fileChannel (Java NIO 文件读写的通道),通过 fileChannel 可以访问物理 CommitLog 文件,但是 RocketMQ 并没有直接使用 fileChannel,而是映射到一个 MappedByteBuffer,我们的目的就是把消息写入这个 ByteBuffer 中,进而写入 MappedFile 对应的 CommitLog 文件。为什么需要这样做,还有哪些细节,会在”文件内存映射“小结中为大家解答。

  • 继续看流程,得到 MappedFile 对应的 ByteBuffer,我们需要将消息序列化,写入 ByteBuffer 中。

    1. 构建消息 id, createMessageId
    2. 获取该消息在消息队列的偏移量,CommitLog 中保存了当前所有消息队列的当前待写入偏移量。
    3. 判断是否是事务消息:这里主要处理 Prepared 类型和 Rollback 类型的消息,设置消息 queueOffset 为 0
    4. 计算消息总大小,calMsgLength。
    5. 判断文件的剩余空间,是否足够写入当条消息,如果不可以,则将文件末尾写入剩余空间大小+固定魔数;然后返回一个 END_OF_FILE 的结果
    6. 如果空间足够,这将这条消息写入之前得到的 MappedFile 的 ByteBuffer 中。
    7. 将各字段按照”消息在 CommitLog 各字段示意图“存入 Bytebuffer,然后返回 PUT_OK 结果

总结

以上就是今天 RocketMQ 消息存储的主要内容,消息只是写入到 CommitLog 对应的 ByteBuffer中,下一期就是我们重要的零拷贝即将登场。我们简单总结一下今天的内容:

  • 要理解消息的存储流程需要先知道消息的存储结构:在物理上消息挨个顺序写入 CommitLog,为了提升消息查询效率需要构建消息的索引文件 ConsumeQueue/IndexFile;
  • Broker 启动时进行参数解析,并初始化了 NettyRemotingServer,启动存储服务用于消息存储及索引构建等;
  • Broker 收到消息存储请求,经过层层校验,获取 CommitLog 对应的 MappedFile,将消息写入MappedFile 对应的内存映射ByteBuffer;

以上就是今天全部的内容,如果觉得本期的内容对你有用的话记得点赞、关注、转发、收藏,这将是对我最大的支持。

如果你需要 RocketMQ 相关的所有资料,可以评论区留言,或者关注公众号:三此君。回复:mq,即可。

消息已经写入 ByteBuffer,写入 ByteBuffer 就可以了吗?那收到消息直接丢弃岂不是更好。消息要落在磁盘上才不会丢失,所以下一期我们要分享的就是消息的刷盘及索引构建,PageCache 及零拷贝也将闪亮登场。感谢观看,下期不见不散。
一张图进阶 RocketMQ - 消息存储-LMLPHP

参考文献

  • RocketMQ 官方文档
  • RocketMQ 源码
  • 丁威, 周继锋. RocketMQ技术内幕:RocketMQ架构设计与实现原理. 机械工业出版社, 2019-01.
  • 李伟. RocketMQ分布式消息中间件:核心原理与最佳实践. 电子工业出版社, 2020-08.
  • 杨开元. RocketMQ实战与原理解析. 机械工业出版社, 2018-06.
07-21 20:51