RocketMQ - 消费者进度保存机制

rocketmq,消费者,进度,保存,机制 · 浏览次数 : 169

小编点评

**RemoteBrokerOffsetStore.java** ```java public class RemoteBrokerOffsetStore implements OffsetStore { // ... } ``` **LocalFileOffsetStore.java** ```java public class LocalFileOffsetStore implements OffsetStore { // ... } ``` **Offset store interface** ```java public interface OffsetStore { // ... } ``` **实现方法** ```java // ... } ``` **使用示例** ```java // ... // Get the message queue MessageQueue mq = ...; // Set the offset store OffsetStore offsetStore = new RemoteBrokerOffsetStore(); // Load offset information offsetStore.load(); // Update the offset offsetStore.updateOffset(mq, 100, true); // ... ```

正文

RocketMQ设计了远程位点管理和本地位点管理两种位点管理方式。集群消费时,位点由客户端提交给Broker保存,具体实现代码在RemoteBrokerOffsetStore.java文件中;广播消费时,位点保存在消费者本地磁盘上,实现代码在LocalFileOffsetStore.java文件中


/**
 * Offset store interface
 */
public interface OffsetStore {
    /**
     * 加载位点信息
     */
    void load() throws MQClientException;

    /**
     * 更新缓存位点信息
     */
    void updateOffset(final MessageQueue mq, final long offset, final boolean increaseOnly);

    /**
     * 读取本地位点信息
     *
     * @return The fetched offset
     */
    long readOffset(final MessageQueue mq, final ReadOffsetType type);

    /**
     * 持久化全部队列的位点信息
     */
    void persistAll(final Set<MessageQueue> mqs);

    /**
     * 持久化某一个队列的位点信息
     */
    void persist(final MessageQueue mq);

    /**
     * 删除某一个队列的位点信息
     */
    void removeOffset(MessageQueue mq);

    /**
     * 复制一份缓存位点信息
     * @return The cloned offset table of given topic
     */
    Map<MessageQueue, Long> cloneOffsetTable(String topic);

    /**
     * 将本地消费位点持久化到Broker中
     * @param mq
     * @param offset
     * @param isOneway
     */
    void updateConsumeOffsetToBroker(MessageQueue mq, long offset, boolean isOneway) throws RemotingException,
        MQBrokerException, InterruptedException, MQClientException;
}

客户端消费进度保存也叫消费进度持久化,开源RocketMQ 4.2.0支持定时持久化和不定时持久化两种方式

定时持久化位点实现方法是org.apache.rocketmq.client.impl.factory.MQClientInstance.startScheduledTask()

定时持久化位点逻辑是通过定时任务来实现的,在启动程序10s后,会定时调用持久化方法MQClientInstance.this.persistAllConsumerOffset(),持久化每一个消费者消费的每一个MessageQueue的消费进度。

不定时持久化也叫Pull-And-Commit,也就是在执行Pull方法的同时,把队列最新消费位点信息发给Broker,具体实现代码在org.apache.rocketmq.client.impl.consumer.DefaultMQPushConsumerImpl.pullMessage()方法中

该方法中有两处持久化位点信息
第一处,在拉取完成后,如果拉取位点非法,则此时客户端会主动提交一次最新的消费位点信息给Broker,以便下次能使用正确的位点拉取消息,该处更新位点信息

第二处,在执行消息拉取动作时,如果是集群消费,并且本地位点值大于0,那么把最新的位点上传给Broker

代码中通过commitOffsetEnable、sysFlag两个字段表示是否可以上报消费位点给Broker。在执行Pull请求时,将sysFlag作为网络请求的消息头传递给Broker,Broker中处理该字段的逻辑在org.apache.rocketmq.broker.processor.PullMessageProcessor.processRequest()方法中

hasCommitOffsetFlag:Pull请求中的sysFlag参数,是决定Broker是否执行持久化消费位点的一个因素。
brokerAllowSuspend:Broker是否能挂起。如果Broker是挂起状态,将不能持久化位点。
storeOffsetEnable:True表示Broker需要持久化消费位点,False则不用持久化位点。

与RocketMQ - 消费者进度保存机制相似的内容:

RocketMQ - 消费者进度保存机制

RocketMQ设计了远程位点管理和本地位点管理两种位点管理方式。集群消费时,位点由客户端提交给Broker保存,具体实现代码在RemoteBrokerOffsetStore.java文件中;广播消费时,位点保存在消费者本地磁盘上,实现代码在LocalFileOffsetStore.java文件中

【RocketMQ】【源码】延迟消息实现原理

RocketMQ设定了延迟级别可以让消息延迟消费,延迟消息会使用SCHEDULE_TOPIC_XXXX这个主题,每个延迟等级对应一个消息队列,并且与普通消息一样,会保存每个消息队列的消费进度(delayOffset.json中的offsetTable): public class MessageSt

【RocketMQ】【源码】顺序消息实现原理

**全局有序** 在RocketMQ中,如果使消息全局有序,可以为Topic设置一个消息队列,使用一个生产者单线程发送数据,消费者端也使用单线程进行消费,从而保证消息的全局有序,但是这种方式效率低,一般不使用。 ![](https://img2022.cnblogs.com/blog/2612945

【RocketMQ】顺序消息实现总结

全局有序 在RocketMQ中,如果使消息全局有序,可以为Topic设置一个消息队列,使用一个生产者单线程发送数据,消费者端也使用单线程进行消费,从而保证消息的全局有序,但是这种方式效率低,一般不使用。 局部有序 假设一个Topic分配了两个消息队列,生产者在发送消息的时候,可以对消息设置一个路由I

RocketMQ - 消费者消费方式

RocketMQ的消费方式包含Pull和Push两种 Pull方式:用户主动Pull消息,自主管理位点,可以灵活地掌控消费进度和消费速度,适合流计算、消费特别耗时等特殊的消费场景。缺点也显而易见,需要从代码层面精准地控制消费,对开发人员有一定要求。 在 RocketMQ 中org.apache.ro

【RocketMQ】【源码】主从模式下的消费进度管理

在[【RocketMQ】消息的拉取](https://www.cnblogs.com/shanml/p/16513229.html)一文中可知,消费者在启动的时候,会创建消息拉取API对象`PullAPIWrapper`,调用pullKernelImpl方法向Broker发送拉取消息的请求,那么在主

RocketMQ - 消费者概述

消费流程 消费者组: 一个逻辑概念,在使用消费者时需要指定一个组名。一个消费者组可以订阅多个Topic。 消费者实例: 一个消费者组程序部署了多个进程,每个进程都可以称为一个消费者实例。 订阅关系: 一个消费者组订阅一个 Topic 的某一个 Tag,这种记录被称为订阅关系。RocketMQ规定消费

【RocketMQ】Rebalance负载均衡总结

消费者负载均衡,是指为消费组下的每个消费者分配订阅主题下的消费队列,分配了消费队列消费者就可以知道去消费哪个消费队列上面的消息,这里针对集群模式,因为广播模式,所有的消息队列可以被消费组下的每个消费者消费不涉及负载均衡,而集群模式一个消息队列同一时间只能分配给组内的一个消费者进行消费。 Rocket

【RocketMQ】【源码】负载均衡源码分析

RocketMQ在集群模式下,同一个消费组内,一个消息队列同一时间只能分配给组内的某一个消费者,也就是一条消息只能被组内的一个消费者进行消费,为了合理的对消息队列进行分配,于是就有了负载均衡。 ![img](https://img2022.cnblogs.com/blog/2612945/20220

【RocketMQ】消息的拉取总结

在上一讲中,介绍了消息的存储,生产者向Broker发送消息之后,数据会写入到CommitLog中,这一讲,就来看一下消费者是如何从Broker拉取消息的。 RocketMQ消息的消费以组为单位,有两种消费模式: 广播模式:同一个消息队列可以分配给组内的每个消费者,每条消息可以被组内的消费者进行消费。