MQ系列12:如何保证消息顺序性

mq,系列,如何,保证,消息,顺序 · 浏览次数 : 552

小编点评

**MQ系列1:消息中间件执行原理** MQ系列消息中间件是实现消息传递的关键组件,它负责将消息从生产者到消费者之间传输。消息中间件通过一个称为消息队列的队列进行消息存储和传递。 **MQ系列2:消息中间件的技术选型** * **分布式架构:**MQ可以采用分布式架构,这使得它能够处理多个生产者和消费者之间的高并发的请求。 * **消息类型划分:**MQ可以根据消息的类型进行消息分发,提高消息处理效率。 * **持久化:**MQ可以持久化消息到磁盘,以确保即使服务器发生故障时仍可以处理消息。 * **消息版本ing:**MQ可以维护消息的版本,以便它能处理消息的更新请求。 **MQ系列3:RocketMQ 架构分析** RocketMQ 是一个基于 Apache Kafka 的消息中间件实现。它使用 Java 编程语言,并提供一个分布式架构,以处理大量消息并提高性能。 **MQ系列4:NameServer 原理解析** NameServer 是一个独立的名称服务,它负责注册和查找消息队列、主题和消费者。NameServer 使用三台服务器运行,以提供高可用性。 **MQ系列5:RocketMQ消息的发送模式** RocketMQ 支持多种消息发送模式,包括: * **有序消费模式:**在生产者将消息发送到消息队列之前,会按照顺序处理它们。 * **并发消费模式:**多个消费者可以从消息队列中同时消费消息。 **MQ系列6:消息的消费** 消息的消费由消息中间件执行,它使用消息监听器来监听到消息队列中的消息。当消息到达消息队列时,消息中间件将其转发到处理线程中。处理线程处理消息并将其传递给相应的消费者。 **MQ系列7:消息通信,追求极致性能** MQ系列提供多种优化技术,以提高消息传递的性能,包括: * **缓存机制:**MQ使用缓存机制来存储消息,以减少内存消耗。 * **多线程处理:**MQ使用多线程处理消息,以提高处理效率。 * **消息版本ing:**MQ使用消息版本ing机制来确保消息的完整性。 **MQ系列8:数据存储,消息队列的高可用保障** MQ使用文件存储机制来存储消息,文件系统是一种高性能且可扩展的存储解决方案。 **MQ系列9:高可用架构分析** MQ的架构采用高度可扩展的架构,以确保即使一个节点失效,消息仍能被处理。通过使用多个消息中间件,MQ可以创建多个处理线程,以处理消息。 **MQ系列10:如何保证消息幂等性消费** MQ提供了多种方法来保证消息幂等性消费,包括: * **使用多个消费者:**每个消费者可以处理一个消息。 * **使用消息版本ing:**MQ可以维护消息的版本,以便它能处理消息的更新请求。 * **使用负载均衡:**MQ可以将消息分配到多个处理节点上。 **MQ系列11:总结** MQ系列是一个高度可扩展且高性能的消息中间件,它可以用于各种应用程序,包括实时应用程序、企业级应用程序和云应用程序。

正文

MQ系列1:消息中间件执行原理
MQ系列2:消息中间件的技术选型
MQ系列3:RocketMQ 架构分析
MQ系列4:NameServer 原理解析
MQ系列5:RocketMQ消息的发送模式
MQ系列6:消息的消费
MQ系列7:消息通信,追求极致性能
MQ系列8:数据存储,消息队列的高可用保障
MQ系列9:高可用架构分析
MQ系列10:如何保证消息幂等性消费
MQ系列11:如何保证消息可靠性传输

1 介绍

消息的有序性在很多业务场景中占有很重要的位置。
比如购物场景,需要按照 创建订单 --> 订单付款 --> 完成订单 顺序执行。
又比如出行场景,接单 --> 接送到达目的地 --> 付款 --> 完成订单。
这种是严格按照顺序执行的,这样的顺序消费才不会出问题,而且各个订单之间是互相独立和并行执行的。
所以,在MQ中,如何稳定地保证顺序性消息处理,是一个不可避免的话题。

2 消息的有序性说明

消息的有序执行,一般不是单个组件的能力。而是整个消息从生产,排队,存储到消费都是有序的,比如上面提到的购物和出行场景。
这就要求我们在消息队列(如果是Kafka,还是RocketMQ、RabbitMQ)中,保证以下前提:

  • 消息生产的有序性:即生产者组件有序发送消息
  • 消息入出队列的有序性:即消息是按照进入的先后顺序排队列放的,遵循FIFO原则。
  • 消息的存储的有序性:与上一点一致,部分场景下为了提高可用,就是要持久化到磁盘,这时候应该遵循有序存放,才能保证后续有序消费
  • 消息消费的有序性:即按照顺序进行消费。又分为全局顺序消息与部分顺序消息,全局是指Topic下的所有消息都要保证顺序;部分顺序消息保证每一组消息被顺序消费即可。

这边还有个问题,如果想让全局都是顺序性消费,那么只能用一个消费者去消费队列(一般来说也是单个生产者),这是会严重影响整体性能的,一般没这个,都是分组顺序执行消费的。
image

2.1 消息生产的有序性

要保证整个消息队列的有序性执行,首先要保证消息生产的有序性。
RocketMQ在Broker中防止了很多Topic,主题(Topic)可以看做消息的归类,我们将消息进行类型划分,相同类型的消息称为一个 Topic。比如我们在淘宝或京东上购买商品的的过程,就可能产生:购物车消息、交易消息、物流消息等,1条消息必然归属于1个 Topic 。
1个 Topic可以有0 ~ n 个生产者向其发送消息;也可以被 0~n 个消费者订阅和处理,于是就有出现了生产者组和消费者组,如下图:
image

或者同一个Topic中,创建不同的Queue,同一个消息生产者将消息隔离发送到不同的Queue中:
image

按照上述的模式,同理,我们只需要保证一组相同的消息按照给定的顺序存入同一个队列中,就能保证生产者有序存储,比如一次完整的消费过程:创建订单、付款、完成订单按照顺序在一个队列(Queue)中执行那就可以了。

★ 同时我们要保证同一组的消息在消息生产的时候投送到一个组中。这个相对来说不难,可以这么做:

  • 比如一个订单的多个子消息的父订单号是一致,我们把这些消息按照订单号取模,投送到对应的Queue中就行了,比如 订单号 % 队列数量( 163105015 % 9)
  • 发送消息自定义消息标签(消息标签可以用队列编号命名),一组消息使用同一个标签,改组标签对应的消息都投向标签所在的队列。

★ 业务程序方面,必须使用同步发送的方式,这样才能保证生产者发送的消息有序,否则按照FIFO的原则,很可能 订单完成 会被先消费。
但是我们业务程序,比如Java代码中为了提升性能,可能使用多线程的模式进行事件触发。多线程下保证生产者顺序性,可以使用锁并配合 spring的publish event(按照顺序执行的内部队列),持久化之后,再按照先进先出的顺序推送消息进入MQ中。
可以参考下 ,大概就是将你的事件进行顺序化一下。

★ 上述方法也不能完完全全的避免顺序化执行。如果broker服务发生故障,或者消息发生丢失,都有可能导致事件消费不完整,出现不一致的问题。

2.2 消息有序性存储

Broker 存储架构采用文件存储机制(类似Kafka),即直接在磁盘上使用文件来保存消息,而不是采用Redis或者MySQL之类的持久化工具。
它会把消息存储所属相关的文件存储在ROCKETMQ_HOME下,包含三个部分:

  • CommitLog 消息元数据
  • ConsumeQueue 消息逻辑队列
  • IndexFile 索引文件

存储消息的元数据,所有消息都会顺序存入到CommitLog文件中。
ConsumeQueue是指存储消息在CommitLog上的索引,一个MessageQueue一个文件,记录当前MessageQueue被哪些消费者组消费到了哪一条CommitLog。
所以一切都是顺序性操作下来的,而且按照 MessageQueue 做了隔离了,不用担心乱序的问题。详细参考 《MQ系列8:数据存储,消息队列的高可用保障

image

2.3 消息消费的有序性

最后一步就是消费的有序性了,既然消息生产和消息持久化都可以做到有序性。那么只要保证消费的有序性,就能保证整个消息队列的有序执行。
这边以RocketMQ为例子,RockerMQ采用MessageListener 回调函数进行监听,监听到消息之后进行数据处理。MessageListener主要提供了两种消费模式,如下:

  • 有序消费模式MessageListenerOrderly
  • 并发消费模式MessageListenerConcurrently

其中有序消费模式有序消费模式MessageListenerOrderly可以保证按照顺序进行消息处理。但是消费的业务代码实现是多线程并行的,依然是无法保证的。
实际上RocketMQ也是这么做的,MessageListenerConcurrently拉到消息之后会提交到线程池去消费,而MessageListenerOrderly则是通过分布式锁和本地锁保证同时只有一条线程去消费一个队列(Queue)上的数据。
这种消费模式就是使用以下3把锁来确保顺序性:

  • broker端的分布式锁
  • messageQueue的本地synchronized锁
  • ProcessQueue的本地consumeLock

3 总结

要消息的顺序性消费:需要保持先后顺序的消息放到同一个消息队列中(kafka中就是partition,rabbitMq中就是queue),然后使用线程池消费的时候使用分布式锁和本地锁保证同时只有一条线程去消费一个队列(Queue)上的数据。

与MQ系列12:如何保证消息顺序性相似的内容:

MQ系列12:如何保证消息顺序性

[MQ系列1:消息中间件执行原理](https://www.cnblogs.com/wzh2010/p/15888498.html "MQ系列1:消息中间件执行原理") [MQ系列2:消息中间件的技术选型](https://www.cnblogs.com/wzh2010/p/15311174.htm

MQ系列7:消息通信,追求极致性能

MQ系列1:消息中间件执行原理 MQ系列2:消息中间件的技术选型 MQ系列3:RocketMQ 架构分析 MQ系列4:NameServer 原理解析 MQ系列5:RocketMQ消息的发送模式 MQ系列6:消息的消费 1 介绍 前面的章节我学习了 NameServer的原理,消息的生产发送,以及消息

MQ系列8:数据存储,消息队列的高可用保障

MQ系列1:消息中间件执行原理 MQ系列2:消息中间件的技术选型 MQ系列3:RocketMQ 架构分析 MQ系列4:NameServer 原理解析 MQ系列5:RocketMQ消息的发送模式 MQ系列6:消息的消费 MQ系列7:消息通信,追求极致性能 1 介绍 在之前的章节中,我们介绍了消息的发送

MQ系列9:高可用架构分析

MQ系列1:消息中间件执行原理 MQ系列2:消息中间件的技术选型 MQ系列3:RocketMQ 架构分析 MQ系列4:NameServer 原理解析 MQ系列5:RocketMQ消息的发送模式 MQ系列6:消息的消费 MQ系列7:消息通信,追求极致性能 MQ系列8:数据存储,消息队列的高可用保障 1

MQ系列10:如何保证消息幂等性消费

MQ系列1:消息中间件执行原理 MQ系列2:消息中间件的技术选型 MQ系列3:RocketMQ 架构分析 MQ系列4:NameServer 原理解析 MQ系列5:RocketMQ消息的发送模式 MQ系列6:消息的消费 MQ系列7:消息通信,追求极致性能 MQ系列8:数据存储,消息队列的高可用保障 M

MQ系列11:如何保证消息可靠性传输(除夕奉上)

MQ系列1:消息中间件执行原理 MQ系列2:消息中间件的技术选型 MQ系列3:RocketMQ 架构分析 MQ系列4:NameServer 原理解析 MQ系列5:RocketMQ消息的发送模式 MQ系列6:消息的消费 MQ系列7:消息通信,追求极致性能 MQ系列8:数据存储,消息队列的高可用保障 M

MQ系列13:消息大量堆积如何为解决

[MQ系列1:消息中间件执行原理](https://www.cnblogs.com/wzh2010/p/15888498.html "MQ系列1:消息中间件执行原理") [MQ系列2:消息中间件的技术选型](https://www.cnblogs.com/wzh2010/p/15311174.htm

MQ系列14:MQ如何做到消息延时处理

[MQ系列1:消息中间件执行原理](https://www.cnblogs.com/wzh2010/p/15888498.html "MQ系列1:消息中间件执行原理") [MQ系列2:消息中间件的技术选型](https://www.cnblogs.com/wzh2010/p/15311174.htm

MQ 消息队列 比较

为什么需要消息队列 削峰 业务系统在超高并发场景中,由于后端服务来不及同步处理过多、过快的请求,可能导致请求堵塞,严重时可能由于高负荷拖垮Web服务器。 为了能支持最高峰流量,我们通常采取短平快的方式——直接扩容服务器,增加服务端的吞吐量。 优点是显而易见的,短时间内吞吐量增加了好几倍,甚至数十倍。

MQ消息积压,把我整吐血了

前言 我之前在一家餐饮公司待过两年,每天中午和晚上用餐高峰期,系统的并发量不容小觑。 为了保险起见,公司规定各部门都要在吃饭的时间轮流值班,防止出现线上问题时能够及时处理。 我当时在后厨显示系统团队,该系统属于订单的下游业务。 用户点完菜下单后,订单系统会通过发kafka消息给我们系统,系统读取消息