RocketMQ - 消费者消费方式

rocketmq,消费者,消费,方式 · 浏览次数 : 219

小编点评

**RocketMQ Pull 和 Push 消费者实现** **Pull 消费者** 1. 使用 `fetchSubscribeMessageQueues()` 方法拉取所有可消费的 Queue。 2. 遍历所有 Queue,拉取每个 Queue 的消息。 3. 执行用户编写的消费代码。 4. 保存消费进度。 **Push 消费者** 1. 创建 `DefaultMQPushConsumer` 实例并启动 Pull 服务。 2. 从 `pullRequestQueue` 中获取消息并执行拉取。 3. 保存消费进度。 4. 处理消费失败的消息。 5. 拉取消息并消费。 6. 保存消费结果。 **注意** * Pull 消费者需要从代码层面精准地控制消费,否则可能会影响消费效率。 * Push 消费者使用简化的位点管理,可以灵活地设置消息消费模式。 * 对于并发消费,Pull 消费者需要计算本地缓存队列中第一个和最后一个消息的 offset 之间的差值,以判断是否消费过慢。

正文

RocketMQ的消费方式包含Pull和Push两种

  • Pull方式:用户主动Pull消息,自主管理位点,可以灵活地掌控消费进度和消费速度,适合流计算、消费特别耗时等特殊的消费场景。缺点也显而易见,需要从代码层面精准地控制消费,对开发人员有一定要求。 在 RocketMQ 中org.apache.rocketmq.client.consumer.DefaultMQPullConsumer 是默认的Pull消费者实现类。

  • Push 方式:代码接入非常简单,适合大部分业务场景。缺点是灵活度差,在了解其消费原理后,排查消费问题方可简单快捷。在RocketMQ 中org.apache.rocketmq.client.consumer.DefaultMQPushConsumer 是默认的Push消费者实现类

消费方式/对比项 Pull Push 备注
是否需要主动拉取 理解分区后,需要主动拉取各个分区消息 自动 Pull 消息灵活;Push 使用更简单
位点管理 用户自行管理或者主动提交给 Broker 管理 Broker 管理 Pull自主管理位点,消费灵活;
Push 位点交由 Broker 管理
Topic 路由变更是否影响消费 Pull模式需要编码实现路由感知
Push 模式自动执行 Rebalance,以适应路由变更

Pull消费流程

image
第一步: fetchSubscribeMessageQueues(StringTopic)。拉取全部可以消费的Queue。如果某一个Broker下线,这里也可以实时感知到。
第二步: 遍历全部Queue,拉取每个Queue可以消费的消息。
第三步: 如果拉取到消息,则执行用户编写的消费代码。
第四步: 保存消费进度。消费进度可以执行updateConsumeOffset()方法,将消费位点上报给Broker,也可以自行保存消费位点。比如流计算平台Flink使用Pull方式拉取消息消费,通过Checkpoint管理消费进度

Push消费流程

image
第一步: 初始化Push消费者实例。业务代码初始化DefaultMQPushConsumer实例,启动Pull服务PullMessageService。该服务是一个线程服务,不断执行run()方法拉取已经订阅Topic的全部队列的消息,将消息保存在本地的缓存队列中。
第二步: 消费消息。由消费服务ConsumeMessageConcurrentlyService或者ConsumeMessageOrderlyService将本地缓存队列中的消息不断放入消费线程池,异步回调业务消费代码,此时业务代码可以消费消息。
第三步: 保存消费进度。业务代码消费后,将消费结果返回给消费服务,再由消费服务将消费进度保存在本地,由消费进度管理服务定时和不定时地持久化到本地(LocalFileOffsetStore支持)或者远程Broker(RemoteBrokerOffsetStore支持)中。对于消费失败的消息,RocketMQ客户端处理后发回给Broker,并告知消费失败

Push消费者如何拉取消息消费
image
第一步:PullMessageService 不断拉取消息。如下源代码是PullMessageService.run()方法,pullRequestQueue 中保存着待拉取的 Topic 和 Queue 信息,程序不断从pullRequestQueue中获取PullRequest并执行拉取消息方法。

第二步:消费者拉取消息并消费
(1)基本校验。校验ProcessQueue是否dropped;校验消费者服务状态是否正常;校验消费者是否被挂起。
(2)拉取条数、字节数限制检查。如果本地缓存消息数量大于配置的最大拉取条数(默认为1000,可以调整),则延迟一段时间再拉取;如果本地缓存消息字节数大于配置的最大缓存字节数,则延迟一段时间再拉取。这两种校验方式都相当于本地流控
(3)并发消费和顺序消费校验,在并发消费时,processQueue.getMaxSpan()方法是用于计算本地缓存队列中第一个消息和最后一个消息的offset差值
image
本地缓存队列的Span如果大于配置的最大差值(默认为2000,可以调整),则认为本地消费过慢,需要执行本地流控

顺序消费时,如果当前拉取的队列在 Broker 端没有被锁定,说明已经有拉取正在执行,当前拉取请求晚点执行;如果不是第一次拉取,需要先计算最新的拉取位点并修正本地最新的待拉取位点信息,再执行拉取
(1)订阅关系校验。如果待拉取的Topic在本地缓存中订阅关系为空,则本地拉取不执行,待下一个正常心跳或者Rebalance后订阅关系恢复正常,方可正常拉取。
(2)封装拉取请求和拉取后的回调对象 PullCallback。这里主要将消息拉取请求和拉取结果处理封装成 PullCallback,并通过调用PullAPIWrapper.pullKernelImpl()方法将拉取请求发出去

ConsumeMessageService 是一个通用消费服务接口


public interface ConsumeMessageService {
    /**
     * 启动服务时使用
     */
    void start();

    /**
     * 关闭服务时使用
     * @param awaitTerminateMillis
     */
    void shutdown(long awaitTerminateMillis);

    /**
     * 更新消费线程池的核心线程数。
     * @param corePoolSize
     */
    void updateCorePoolSize(int corePoolSize);

    /**
     * 增加一个消费线程池的核心线程数。
     */
    void incCorePoolSize();

    /**
     * 减少一个消费线程池的核心线程数
     */
    void decCorePoolSize();

    /**
     * 获取消费线程池的核心线程数
     * @return
     */
    int getCorePoolSize();

    /**
     * 如果一个消息已经被消费过了,但是还想再消费一次,就需要实现这个方法
     * @param msg
     * @param brokerName
     * @return
     */
    ConsumeMessageDirectlyResult consumeMessageDirectly(final MessageExt msg, final String brokerName);

    /**
     * 将消息封装成线程池任务,提交给消费服务,消费服务再将消息传递给业务消费进行处理
     * @param msgs
     * @param processQueue
     * @param messageQueue
     * @param dispathToConsume
     */
    void submitConsumeRequest(
        final List<MessageExt> msgs,
        final ProcessQueue processQueue,
        final MessageQueue messageQueue,
        final boolean dispathToConsume);
}

消费消息主要分为消费前预处理、消费回调、消费结果统计、消费结果处理4个步骤
第一步:消费执行前进行预处理。执行消费前的hook和重试消息预处理。消费前的hook可以理解为消费前的消息预处理(比如消息格式校验)。如果拉取的消息来自重试队列,则将Topic名重置为原来的Topic名,而不用重试Topic名。
第二步:消费回调。首先设置消息开始消费时间为当前时间,再将消息列表转为不可修改的List,然后通过listener.consumeMessage(Collections.unmodifiableList(msgs),context)方法将消息传递给用户编写的业务消费代码进行处理。
第三步:消费结果统计和执行消费后的hook。客户端原生支持基本消费指标统计,比如消费耗时;消费后的hook和消费前的hook要一一对应,用户可以用消费后的hook统计与自身业务相关的指标。
第四步:消费结果处理。包含消费指标统计、消费重试处理和消费位点处理。消费指标主要是对消费成功和失败的TPS的统计;消费重试处理主要将消费重试次数加1;消费位点处理主要根据消费结果更新消费位点记录

顺序消息的 ConsumeRequest 中并没有保存需要消费的消息,在顺序消费时通过调用ProcessQueue.takeMessags()方法获取需要消费的消息,而且消费也是同步进行的
msgTreeMap:是一个TreeMap<Long,MessageExt>类型,key是消息物理位点值,value是消息对象,该容器是ProcessQueue用来缓存本地顺序消息的,保存的数据是按照key(就是物理位点值)顺序排列的。
consumingMsgOrderlyTreeMap:是一个TreeMap<Long,MessageExt>类型,key是消息物理位点值,Value是消息对象,保存当前正在处理的顺序消息集合,是msgTreeMap的一个子集。保存的数据是按照key(就是物理位点值)顺序排列的。
batchSize:一次从本地缓存中获取多少条消息回调给用户消费

与RocketMQ - 消费者消费方式相似的内容:

RocketMQ - 消费者消费方式

RocketMQ的消费方式包含Pull和Push两种 Pull方式:用户主动Pull消息,自主管理位点,可以灵活地掌控消费进度和消费速度,适合流计算、消费特别耗时等特殊的消费场景。缺点也显而易见,需要从代码层面精准地控制消费,对开发人员有一定要求。 在 RocketMQ 中org.apache.ro

RocketMQ - 消费者进度保存机制

RocketMQ设计了远程位点管理和本地位点管理两种位点管理方式。集群消费时,位点由客户端提交给Broker保存,具体实现代码在RemoteBrokerOffsetStore.java文件中;广播消费时,位点保存在消费者本地磁盘上,实现代码在LocalFileOffsetStore.java文件中

【RocketMQ】【源码】顺序消息实现原理

**全局有序** 在RocketMQ中,如果使消息全局有序,可以为Topic设置一个消息队列,使用一个生产者单线程发送数据,消费者端也使用单线程进行消费,从而保证消息的全局有序,但是这种方式效率低,一般不使用。 ![](https://img2022.cnblogs.com/blog/2612945

【RocketMQ】顺序消息实现总结

全局有序 在RocketMQ中,如果使消息全局有序,可以为Topic设置一个消息队列,使用一个生产者单线程发送数据,消费者端也使用单线程进行消费,从而保证消息的全局有序,但是这种方式效率低,一般不使用。 局部有序 假设一个Topic分配了两个消息队列,生产者在发送消息的时候,可以对消息设置一个路由I

【RocketMQ】RocketMQ 5.0新特性(二)- Pop消费模式

Pop模式消费和消息粒度负载均衡 在RocketMQ 5.0之前,消费有两种方式可以从Broker获取消息,分别为Pull模式和Push模式。 Pull模式:消费需要不断的从阻塞队列中获取数据,如果没有数据就等待,这个阻塞队列中的数据由消息拉取线程从Broker拉取消息之后加入的,所以Pull模式下

【RocketMQ】【源码】消息拉模式分析

RocketMQ有两种获取消息的方式,分别为推模式和拉模式。 **推模式** 推模式在[【RocketMQ】消息的拉取](https://www.cnblogs.com/shanml/p/16463964.html)一文中已经讲过,虽然从名字上看起来是消息到达Broker后推送给消费者,实际上还是需

RocketMQ - 生产者最佳实践总结

相对消费者而言,生产者的使用更加简单,一般关注消息类型、消息发送方法和发送参数,即可正常使用RocketMQ发送消息 常用消息类型 | 消息类型 | 优点 | 缺 点 | 备注 | | | | | | | 普通消息(并发消息) | 性能最好。单机TPS的级别为100 000 | 消息的生产和消费都无

【RocketMQ】【源码】主从模式下的消费进度管理

在[【RocketMQ】消息的拉取](https://www.cnblogs.com/shanml/p/16513229.html)一文中可知,消费者在启动的时候,会创建消息拉取API对象`PullAPIWrapper`,调用pullKernelImpl方法向Broker发送拉取消息的请求,那么在主

可重入锁思想,设计MQ迁移方案

如果你的MQ消息要从Kafka切换到RocketMQ且不停机,怎么做?在让这个MQ消息调用第三方发奖接口,但无幂等字段又怎么处理?今天小傅哥就给大家分享一个关于MQ消息在这样的场景中的处理手段。 这是一种比较特例的场景,需要保证切换的MQ消息不被两端同时消费,并且还需要在一段消费失败后的MQ还可以继

【RocketMQ】RocketMQ存储结构设计

CommitLog 生产者向Broker发送的消息,会以顺序写的方式,写入CommitLog文件,CommitLog文件的根目录由配置参数storePathRootDir决定,默认每一个CommitLog的文件大小为1G,如果文件写满会新建一个CommitLog文件,以该文件中第一条消息的偏移量为文