MQ系列8:数据存储,消息队列的高可用保障

mq,系列,数据,存储,消息,队列,可用,保障 · 浏览次数 : 572

小编点评

**2 Broker 存储架构RocketMQ采用文件存储机制** **3 存储的执行过程** * 客户端发送消息时,消息格式化成 CommitLog 的字段结构,并按照顺序写入到CommitLog 文件中。 * Broker会按照 ConsumeQueue 的字段结构要求创建一条索引记录。 * 按需创建IndexFile索引文件。 **4 存储的执行效率** * RocketMQ 使用 10W+ 级别的吞吐,可以通过多线程和缓存机制提升效率。 * 每个 MappedFile 包括多个 CommitLog 文件,并使用 MappedByteBuffer 提高效率。 * 每个 CommitLog 文件包含多个 MappedFile,以提高并发性。 * 通过存储消息数据进行索引,快速定位消息。 **5 总结** * RabbitMQ 中 Broker 存储消息采用文件存储机制,使用 CommitLog、ConsumeQueue 和 IndexFile 三部分组成。 * CommitLog 是消息数据的主要存储,使用 MappedFile 存储消息数据,以提升效率。 * RocketMQ 通过多线程、缓存和索引等技术实现高性能存储和读取。

正文

MQ系列1:消息中间件执行原理
MQ系列2:消息中间件的技术选型
MQ系列3:RocketMQ 架构分析
MQ系列4:NameServer 原理解析
MQ系列5:RocketMQ消息的发送模式
MQ系列6:消息的消费
MQ系列7:消息通信,追求极致性能

1 介绍

在之前的章节中,我们介绍了消息的发送 和 消息通信 的原理。但是这边有一个比较核心的关键点,那就是如果已经把消息传递给Broker。在Broker在被消费之前,如何保证消息的稳定性,避免消息丢失和数据。
这时候就需要数据持久化数据来进行保障了。
根据之前我们 MQ系列2:消息中间件的技术选型 章节做的分析,RabbitMQ支持 1W+ 级别的吞吐,
Kafka 和 Rocket 支持 10W+ 级别的吞吐,想要实现这么大的吞吐,必须具备一个很强悍的存储功能。下面我们来看看。

2 Broker 存储架构

RocketMQ采用文件存储机制(类似Kafka),即直接在磁盘上使用文件来保存消息,而不是采用Redis或者MySQL之类的持久化工具。
它会把消息存储所属相关的文件存储在ROCKETMQ_HOME下,包含三个部分:

2.1 CommitLog 消息元数据

存储消息的元数据,所有消息都会顺序存入到CommitLog文件中。CommitLog由多个文件组成,每个文件固定大小1G。它有如下特征:

  • 单个文件默认大小为1G
  • 文件名称长度20,保存偏移量,偏移量不够20位的补0。
    • 如第1个文件没有偏移量,则为:00000000000000000000
    • 第2个文件起始偏移量为1073741824(1G=1073740842),则文件名为 00000000001073741824。
  • 第一个1G文件文件写满之后之后转入第2个文件,如此反复,因为是顺序的,所以写入效率较高。

2.2 ConsumeQueue 消息逻辑队列

ConsumeQueue是指存储消息在CommitLog上的索引,一个MessageQueue一个文件,记录当前MessageQueue被哪些消费者组消费到了哪一条CommitLog。它有如下特征:

  • ComsumeQueue的结构组成共 20 个字节,包含 8 字节的 commitlog 物理偏移量、4 字节的消息长度、8 字节 tag hashcode
  • ConsumeQueue 里只存偏移量信息,内容精悍。加载到内存中,操作效率非常高。
  • 一致性保障,CommitLog 里存储了 ConsumeQueues、Message Key、Tag 等所有信息,在 ConsumeQueue 丢失或者故障时候,数据可快速回复。
  • 因为每个Topic下可能有多个Queueu,所以存储结构为:HOME/store/consumequeue/{topic}/{queueId}/{fileName}。

2.3 IndexFile 索引文件

IndexFile 是一种可选索引文件,提供了一种可以通过 key 或时间区间来查询消息的方法,并且这种查找消息的方法不影响发送与消费消息的主流程。它的特征如下:

  • 算法原理:IndexFile 索引文件的底层实现 为 hash 索引,可以对照 Java 的HashMap比较,通过计算 Key 的 hashcode, 取余获得 hash 槽,并通过拉链法解决哈希冲突。
  • 大小限制:IndexFile 以创建时间戳命名,单个 IndexFile 文件大小约为 400M,一个 IndexFile 可以保存 2000W 个索引。

通过上面的三个部件说明可以了解到,RocketMQ 消息存储结构主要是由 CommitLog,ConsumeQueue,IndexFile 三部分组成的。当我们发送消息的时候,会执行如下过程:

  • 消息格式化成 CommitLog的字段结构,并按照顺序写入到CommitLog 文件中。
  • Broker会按照 ConsumeQueue 的字段结构的要求创建一条索引记录。
  • 按需创建IndexFile索引文件。

image

3 存储的执行过程

通过上面我们已经了解到了,Kafka 和 Rocket 均支持 10W+ 级别的吞吐,那么上述的存储结构是如何保持这样的高超性能的呢?

image

  • 之前的章节我们已经了解到,Broker 启动时同步启动 NettyRemotingServer 进行端口监听,等坐等客户端的连接。
  • 当客户端发送请求时,NettyRemotingServer WorkerGroup 处理可读事件,执行 processRequestCommand 处理来源消息数据。
  • 接收到消息之后就需要存储下来了,DefaultMessageStore对数据进行校验,校验如下,校验完成之后发送存储指令。
    • Broker 无响应时拒绝消息写入
    • Broker 角色 为 SLAVE 时也拒绝写入
    • 判断是否支持写入,不支持写入时也拒绝
    • topic length 小于等于 256 字符,否则拒绝消息写入
    • 消息 length 小于等于 65536 字符,否则拒绝消息写入
    • PageCache 繁忙时报错误消息,无法写入
  • DefaultMessageStore 调用 CommitLog.putMessage 存入消息
    • 获取可以写入的 CommitLog 进行写入
    • CommitLog(每个CommitLog默认1G大小) 对应 MappedFile(程序视角),当有多个 MappedFiled 时,组成 MappedFileQueue。
    • MappedFile 持有物理 CommitLog 的 fileChannel (Java NIO 文件读写的通道),但并没有通过 fileChannel 直接访问物理 CommitLog 文件,而是映射到一个 MappedByteBuffer,并把序列化后的消息写入这个 ByteBuffer 中,已达到提升执行效率的目的。
    • 最后写入 MappedFile 相对应的 CommitLog 文件中。

4 总结

  • 理解好RabbitMQ 中 Broker 存储的组成要素 CommitLog,ConsumeQueue,IndexFile。
  • 当 Broker 收到消息存储请求时,通过调用 CommitLog 对应的 MappedFile,把消息写入MappedFile的MeppedByteBuffer(内存映射)。

与MQ系列8:数据存储,消息队列的高可用保障相似的内容:

MQ系列8:数据存储,消息队列的高可用保障

MQ系列1:消息中间件执行原理 MQ系列2:消息中间件的技术选型 MQ系列3:RocketMQ 架构分析 MQ系列4:NameServer 原理解析 MQ系列5:RocketMQ消息的发送模式 MQ系列6:消息的消费 MQ系列7:消息通信,追求极致性能 1 介绍 在之前的章节中,我们介绍了消息的发送

MQ系列9:高可用架构分析

MQ系列1:消息中间件执行原理 MQ系列2:消息中间件的技术选型 MQ系列3:RocketMQ 架构分析 MQ系列4:NameServer 原理解析 MQ系列5:RocketMQ消息的发送模式 MQ系列6:消息的消费 MQ系列7:消息通信,追求极致性能 MQ系列8:数据存储,消息队列的高可用保障 1

MQ系列10:如何保证消息幂等性消费

MQ系列1:消息中间件执行原理 MQ系列2:消息中间件的技术选型 MQ系列3:RocketMQ 架构分析 MQ系列4:NameServer 原理解析 MQ系列5:RocketMQ消息的发送模式 MQ系列6:消息的消费 MQ系列7:消息通信,追求极致性能 MQ系列8:数据存储,消息队列的高可用保障 M

MQ系列11:如何保证消息可靠性传输(除夕奉上)

MQ系列1:消息中间件执行原理 MQ系列2:消息中间件的技术选型 MQ系列3:RocketMQ 架构分析 MQ系列4:NameServer 原理解析 MQ系列5:RocketMQ消息的发送模式 MQ系列6:消息的消费 MQ系列7:消息通信,追求极致性能 MQ系列8:数据存储,消息队列的高可用保障 M

MQ系列7:消息通信,追求极致性能

MQ系列1:消息中间件执行原理 MQ系列2:消息中间件的技术选型 MQ系列3:RocketMQ 架构分析 MQ系列4:NameServer 原理解析 MQ系列5:RocketMQ消息的发送模式 MQ系列6:消息的消费 1 介绍 前面的章节我学习了 NameServer的原理,消息的生产发送,以及消息

MQ系列12:如何保证消息顺序性

[MQ系列1:消息中间件执行原理](https://www.cnblogs.com/wzh2010/p/15888498.html "MQ系列1:消息中间件执行原理") [MQ系列2:消息中间件的技术选型](https://www.cnblogs.com/wzh2010/p/15311174.htm

MQ系列13:消息大量堆积如何为解决

[MQ系列1:消息中间件执行原理](https://www.cnblogs.com/wzh2010/p/15888498.html "MQ系列1:消息中间件执行原理") [MQ系列2:消息中间件的技术选型](https://www.cnblogs.com/wzh2010/p/15311174.htm

MQ系列14:MQ如何做到消息延时处理

[MQ系列1:消息中间件执行原理](https://www.cnblogs.com/wzh2010/p/15888498.html "MQ系列1:消息中间件执行原理") [MQ系列2:消息中间件的技术选型](https://www.cnblogs.com/wzh2010/p/15311174.htm

MQ 消息队列 比较

为什么需要消息队列 削峰 业务系统在超高并发场景中,由于后端服务来不及同步处理过多、过快的请求,可能导致请求堵塞,严重时可能由于高负荷拖垮Web服务器。 为了能支持最高峰流量,我们通常采取短平快的方式——直接扩容服务器,增加服务端的吞吐量。 优点是显而易见的,短时间内吞吐量增加了好几倍,甚至数十倍。

MQ消息积压,把我整吐血了

前言 我之前在一家餐饮公司待过两年,每天中午和晚上用餐高峰期,系统的并发量不容小觑。 为了保险起见,公司规定各部门都要在吃饭的时间轮流值班,防止出现线上问题时能够及时处理。 我当时在后厨显示系统团队,该系统属于订单的下游业务。 用户点完菜下单后,订单系统会通过发kafka消息给我们系统,系统读取消息