【RocketMQ】顺序消息实现总结

RocketMQ,消息,顺序 · 浏览次数 : 0

小编点评

**消息队列锁** **方法一:mqLockTable.get(mq)** * 获取消息队列对应的对象锁。 * 如果锁获取失败,创建新的锁对象并加入到Map中。 * 返回获取的锁对象。 **方法二:mqLockTable.putIfAbsent(mq, objLock)** * 将消息队列和对象锁加入到Map中。 * 如果锁获取失败,尝试获取锁。 * 如果锁获取成功,返回获取的锁对象。 **方法三:mqLockTable.remove(mq, objLock)** * 从Map中移除消息队列和对象锁。 * 如果Map中没有该消息队列,则释放锁。 * 返回移除的锁对象。

正文

全局有序
在RocketMQ中,如果使消息全局有序,可以为Topic设置一个消息队列,使用一个生产者单线程发送数据,消费者端也使用单线程进行消费,从而保证消息的全局有序,但是这种方式效率低,一般不使用。

局部有序
假设一个Topic分配了两个消息队列,生产者在发送消息的时候,可以对消息设置一个路由ID,比如想保证一个订单的相关消息有序,那么就使用订单ID当做路由ID,在发送消息的时候,通过订单ID对消息队列的个数取余,根据取余结果选择消息队列,这样同一个订单的数据就可以保证发送到一个消息队列中,消费者端使用MessageListenerOrderly处理有序消息,这就是RocketMQ的局部有序,保证消息在某个消息队列中有序。

接下来看RoceketMQ源码中提供的顺序消息例子(稍微做了一些修改):

生产者

public class Producer {
    public static void main(String[] args) throws UnsupportedEncodingException {
        try {
            // 创建生产者
            DefaultMQProducer producer = new DefaultMQProducer("生产者组");
            // 启动
            producer.start();
            // 创建TAG
            String[] tags = new String[] {"TagA", "TagB", "TagC", "TagD", "TagE"};
            for (int i = 0; i < 100; i++) {
                // 生成订单ID
                int orderId = i % 10;
                // 创建消息
                Message msg =
                    new Message("TopicTest", tags[i % tags.length], "KEY" + i,
                        ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
                SendResult sendResult = producer.send(msg, new MessageQueueSelector() {
                    @Override
                    public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
                        // 获取订单ID
                        Integer id = (Integer) arg;
                        // 对消息队列个数取余
                        int index = id % mqs.size();
                        // 根据取余结果选择消息要发送给哪个消息队列
                        return mqs.get(index);
                    }
                }, orderId); // 这里传入了订单ID
                System.out.printf("%s%n", sendResult);
            }

            producer.shutdown();
        } catch (MQClientException | RemotingException | MQBrokerException | InterruptedException e) {
            e.printStackTrace();
        }
    }
}

消费者

public class Consumer {

    public static void main(String[] args) throws MQClientException {
        // 创建消费者
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("消费者组");
        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
        // 订阅主题
        consumer.subscribe("TopicTest", "TagA || TagC || TagD");
        // 注册消息监听器,使用的是MessageListenerOrderly
        consumer.registerMessageListener(new MessageListenerOrderly() {
            @Override
            public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {
                context.setAutoCommit(true);
                // 打印消息
                System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
                return ConsumeOrderlyStatus.SUCCESS;
            }
        });
        consumer.start();
        System.out.printf("Consumer Started.%n");
    }
}

从例子中可以看出生产者在发送消息的时候,通过订单ID作为路由信息,将同一个订单ID的消息发送到了同一个消息队列中,保证同一个订单ID相关消息有序发送,接下来就看消费者是如何保证消息的顺序消费的。

定时任务对消息队列加锁

消费者在启动的时候,会对是否是顺序消费进行判断(监听器是否是MessageListenerOrderly类型来判断),如果是顺序消费,会使用ConsumeMessageOrderlyService,并调用它的start方法进行启动,在集群模式模式下,start方法中会启动一个定时加锁的任务,周期性的对该消费者负责的消息队列进行加锁。

为什么集群模式下需要加锁?
因为广播模式下,消息队列会分配给消费者下的每一个消费者,而在集群模式下,一个消息队列同一时刻只能被同一个消费组下的某一个消费者进行,所以在广播模式下不存在竞争关系,也就不需要对消息队列进行加锁,而在集群模式下,有可能因为负载均衡等原因将某一个消息队列分配到了另外一个消费者中,因此在顺序消费情况下,集群模式下需要对消息队列加锁,当某个消息队列被锁定时,其他的消费者不能进行消费。

加锁的具体逻辑如下,首先获取当前消费者负责的所有消息队列MessageQueue,返回数据是一个MAP,key为broker名称,value为broker下的消息队列,接着对MAP进行遍历,处理每一个broker下的消息队列:
(1)根据broker名称查找broker的详细信息;
(2)创建加锁请求,在请求中设置要加锁的消息队列,将请求发送给broker,表示要对这些消息队列进行加锁;
(3)Broker返回请求处理结果,响应结果中包含了加锁成功的消息队列,对于加锁成功的消息队列将消息队列MessageQueue,将其对应的ProcessQueue中的locked属性置为true表示该消息队列已加锁成功,如果响应中未包含某个消息队列的信息,表示此消息队列加锁失败,需要将其对应的ProcessQueue对象中的locked属性置为false表示加锁失败;

顺序消息拉取

上面可知,在使用顺序消息时,定时任务会周期性的对当前消费者负责的消息队列进行加锁,不过由于负载均衡等原因,有可能给当前消费者分配了新的消息队列,此时还未来得及通过定时任务加锁,所以消费者在构建消息拉取请求前会再次进行判断,如果是新分配到当前消费者的消息队列,同样会向Broker发送请求,对MessageQueue进行加锁,加锁成功将其对应的ProcessQueue中的locked属性置为true才可以拉取消息。

顺序消息消费

消息拉取成功之后,会将消息提交到线程池中进行处理,对于顺序消费处理逻辑如下:

  1. 获取消息队列MessageQueue的对象锁,每个MessageQueue对应了一把Object对象锁,然后使用synchronized进行加锁,这里加锁的原因是因为顺序消费使用的是线程池,由多个线程同时进行消费,所以某个线程在处理某个消息队列的消息时需要对该消息队列MessageQueue加锁,防止其他线程并发消费该消息队列的锁,破坏消息的顺序性

    public class MessageQueueLock {
        private ConcurrentMap<MessageQueue, Object> mqLockTable = new ConcurrentHashMap<MessageQueue, Object>();
    
        public Object fetchLockObject(final MessageQueue mq) {
            // 获取消息队列对应的对象锁,也就是一个Object类型的对象
            Object objLock = this.mqLockTable.get(mq);
            // 如果获取为空
            if (null == objLock) {
                // 创建对象
                objLock = new Object();
                // 加入到Map中
                Object prevLock = this.mqLockTable.putIfAbsent(mq, objLock);
                if (prevLock != null) {
                    objLock = prevLock;
                }
            }
            return objLock;
        }
    }
    
  2. 上一步获取锁成功之后,会再次校验该MessageQueue对应的ProcessQueue中的锁(locked状态),看是否过期或者已经失效,过期或者失效稍后会重新进行加锁;

  3. 获取ProcessQueue的中的consumeLock消费锁,获取成功之后调用消息监听器的consumeMessage方法开始消费消费;

    public class ProcessQueue {
       // 消息消费锁
       private final Lock consumeLock = new ReentrantLock();
    
       public Lock getConsumeLock() { // 获取消息消费锁
             return consumeLock;
       }
    }
    
  4. 消息消费完毕,释放ProcessQueueconsumeLock消费锁;

  5. 方法执行完毕,释放MessageQueue对应的Object对象锁;

在第1步中就已经获取了MessageQueue对应的Object对象锁对消息队列进行加锁了,那么为什么在第3步消费消息之前还要再加一个消费锁呢?

猜测有可能是在消费者进行负载均衡时,当前消费者负责的消息队列发生变化,可能移除某个消息队列,那么消费者在进行消费的时候就要获取ProcessQueueconsumeLock消费锁进行加锁,相当于锁住ProcessQueue,防止正在消费的过程中,ProcessQueue被负载均衡移除。

既然如此,负载均衡的时候为什么不使用MessageQueue对应的Object对象锁进行加锁而要使用ProcessQueue中的consumeLock消费锁?

这里应该是为了减小锁的粒度,因为消费者在MessageQueue对应的Object加锁后,还进行了一系列的判断,校验都成功之后获取ProcessQueue中的consumeLock加锁,之后开始消费消息,消费完毕释放所有的锁,如果负载均衡使用MessageQueueObject对象锁需要等待整个过程结束,锁的粒度较粗,这样显然会降低性能,而如果使用消息消费锁,只需要等待第3步和第4步结束就可以获取锁,减少等待的时间,而且消费者在进行消息消费前也会判断ProcessQueue是否被移除,所以只要保证consumeMessage方法在执行的过程中(消息被消费的过程)ProcessQueue不被移除即可。

总结

消费者端,是通过加锁来保证消息的顺序消费,一共有三把锁:

  1. 向Broker申请的消息队列锁
    集群模式下一个消息队列同一时刻只能被同一个消费组下的某一个消费者进行,为了避免负载均衡等原因引起的变动,消费者会向Broker发送请求对消息队列进行加锁,如果加锁成功,记录到消息队列对应的ProcessQueue中的locked变量中。

  2. 消息队列锁
    对应MessageQueue对应的Object对象锁,消费者在处理拉取到的消息时,由于可以开启多线程进行处理,所以处理消息前需要对MessageQueue加锁,锁住要处理的消息队列,主要是处理多线程之间的竞争,保证消息的顺序性。

  3. 消息消费锁
    对应ProcessQueue中的consumeLock,消费者在调用consumeMessage方法之前会加消费锁,主要是为了避免在消费消息时,由于负载均衡等原因,ProcessQueue被删除

对应的相关源码可参考:

【RocketMQ】【源码】顺序消息实现原理

与【RocketMQ】顺序消息实现总结相似的内容:

【RocketMQ】顺序消息实现总结

全局有序 在RocketMQ中,如果使消息全局有序,可以为Topic设置一个消息队列,使用一个生产者单线程发送数据,消费者端也使用单线程进行消费,从而保证消息的全局有序,但是这种方式效率低,一般不使用。 局部有序 假设一个Topic分配了两个消息队列,生产者在发送消息的时候,可以对消息设置一个路由I

【RocketMQ】【源码】顺序消息实现原理

**全局有序** 在RocketMQ中,如果使消息全局有序,可以为Topic设置一个消息队列,使用一个生产者单线程发送数据,消费者端也使用单线程进行消费,从而保证消息的全局有序,但是这种方式效率低,一般不使用。 ![](https://img2022.cnblogs.com/blog/2612945

详解RocketMQ 顺序消费机制

摘要:顺序消息是指对于一个指定的 Topic ,消息严格按照先进先出(FIFO)的原则进行消息发布和消费,即先发布的消息先消费,后发布的消息后消费。 本文分享自华为云社区《RocketMQ 顺序消费机制》,作者: 勇哥java实战分享 。 顺序消息是指对于一个指定的 Topic ,消息严格按照先进先

记一次RocketMQ消费非顺序消息引起的线上事故

应用场景 C端用户提交工单、工单创建完成之后、会发布一条工单创建完成的消息事件(异步消息)、MQ消费者收到消息之后、会通知各处理器处理该消息、各处理器处理完后都会发布一条将该工单写入搜索引擎的消息、最终该工单出现在搜索引擎、被工单处理人检索和处理。 事故异常体现 1、异常体现 从工单的流转记录发现、

【RocketMQ】RocketMQ存储结构设计

CommitLog 生产者向Broker发送的消息,会以顺序写的方式,写入CommitLog文件,CommitLog文件的根目录由配置参数storePathRootDir决定,默认每一个CommitLog的文件大小为1G,如果文件写满会新建一个CommitLog文件,以该文件中第一条消息的偏移量为文

【主流技术】聊一聊消息队列 RocketMQ 的基本结构与概念

RocketMQ 是阿里巴巴在 2012 年开源的分布式消息中间件,目前已经捐赠给 Apache 软件基金会,并于 2017 年 9 月 25 日成为 Apache 的顶级项目。 作为经历过多次阿里巴巴双十一这种“超级工程”的洗礼并有稳定出色表现的国产中间件,以其高性能、低延时和高可靠等特性近年来...

RocketMQ 事件驱动:云时代的事件驱动有啥不同?

本文深入探讨了云时代 EDA 的新内涵及它在云时代再次流行的主要驱动力,包括技术驱动力和商业驱动力,随后重点介绍了 RocketMQ 5.0 推出的子产品 EventBridge,并通过几个云时代事件驱动的典型案例,进一步叙述了云时代事件驱动的常见场景和最佳实践。

RocketMq开启安全认证ACL-解决服务器系统安全漏洞

1、为什么要开启ACL 通过之前的文章我们已经知道怎么安装RocketMq了。如果你还不会安装RocketMq可以查看我的这篇文章:快速入门一篇搞定RocketMq-实现微服务实战落地 进行软件安装,附文章地址:https://www.cnblogs.com/sowler/p/18173752 。虽

如何实现一个简单易用的 RocketMQ SDK

2018 年,做为架构负责人,接到一个架构需求:实现一个简单易用的 RocketMQ SDK 。 因为各个团队 RocketMQ 原生客户端配置起来千奇百怪,有的配置存在风险,各团队负责人都需要一个简洁易用的 RocketMQ SDK 。 我立马调研相关开源的方案,当时 RocketMQ-Sprin

RocketMQ 之 IoT 消息解析:物联网需要什么样的消息技术?

前言: 从初代开源消息队列崛起,到 PC 互联网、移动互联网爆发式发展,再到如今 IoT、云计算、云原生引领了新的技术趋势,消息中间件的发展已经走过了 30 多个年头。 目前,消息中间件在国内许多行业的关键应用中扮演着至关重要的角色。随着数字化转型的深入,客户在使用消息技术的过程中往往同时涉及交叉场