RocketMQ - 消费者启动机制

rocketmq,消费者,启动,机制 · 浏览次数 : 206

小编点评

# DefaultMQPullConsumerImpl.start()方法的启动过程 DefaultMQPullConsumerImpl的start()方法是一个复杂的流程,包含11个步骤。以下是一个详细的介绍: **1. 创建状态** DefaultMQPullConsumerImpl的状态为ServiceState.CREATE_JUST。 **2. 设置默认启动状态** 消费者的默认启动状态为失败。 **3. 检查配置** 检查消费者实例名,如果是默认的名字,则更改为当前的程序进程id。 **4. 获取MQClientInstance** 获取一个 MQClientInstance实例,如果 MQClientInstance已经初始化,则直接返回已初始化的实例。 **5. 设置Rebalance对象** 设置Rebalance对象消费者组、消费类型、Queue分配策略、MQClientInstance等参数。 **6. 初始化位点管理器** 并加载位点信息。位点管理器分为本地管理和远程管理两种,集群消费时消费位点保存在 Broker 中,由远程管理器管理;广播消费时位点存储在本地,由本地管理器管理。 **7. 本地注册消费者实例** 如果注册成功,则表示消费者启动成功。 **8. 启动MQClientInstance实例** 启动MQClientInstance实例DefaultMQPushConsumer的启动过程与DefaultMQPullConsumer的启动过程类似,用户也是通过构造函数初始化,依次调用DefaultMQPushConsumer的start方法和其内部实现类DefaultMQPushConsumerImpl的start()方法,开启整个启动过程的。 **9. 更新本地订阅关系和路由信息** 通过 Broker 检查是否支持消费者的过滤类型;向集群中的所有Broker发送消费者组的心跳信息。 **10. 立即执行一次Rebalance** 归纳总结以上内容,生成内容时需要带简单的排版

正文

RocketMQ客户端中有两个独立的消费者实现类:org.apache.rocketmq.client.consumer.DefaultMQPullConsumer 和 org.apache.rocketmq.client.consumer.DefaultMQPushConsumer

DefaultMQPullConsumer

2022年已弃用,使用 DefaultLitePullConsumer 代替

该消费者使用时需要用户主动从 Broker 中 Pull 消息和消费消息,提交消费位点。DefaultMQPullConsumer的类图继承关系
image

核心属性:

namesrvAddr: 继承自 ClientConfig,表示 RocketMQ 集群的Namesrv 地址,如果是多个则用分号分开。比如:127.0.0.1:9876;127.0.0.2:9876。

clientIP: 使用的客户端程序所在机器的 IP地址。支持 IPv4和IPv6,IPv4 排除了本地的环回地址(127.0.xxx.xxx)和私有内网地址(192.168.xxx.xxx)。这里需要注意的是,如果 Client 运行在Docker 容器中,获取的 IP 地址是容器所在的 IP 地址,而非宿主机的IP地址。

instanceName: 实例名,每个实例都需要取唯一的名字,因为有时我们会在同一个机器上部署多个程序进程,如果名字有重复就会导致启动失败。

vipChannelEnabled: 这是一个 boolean 值,表示是否开启 VIP通道。VIP 通道和非VIP通道的区别是:在通信过程中使用的端口号不同。

clientCallbackExecutorThreads: 客户端回调线程数。该参数表示 Netty 通信层回调线程的个数 ,默认值 Runtime.getRuntime().availableProcessors()表示当前CPU的有效个数。

pollNameServerInterval: 获取 Topic 路由信息的间隔时长,单位为 ms,默认为30 000ms。

heartbeatBrokerInterval: 与Broker心跳间隔的时长,单位为 ms,默认为30 000ms。

persistConsumerOffsetInterval: 持久化消费位点时间间隔,单位为 ms,默认为5000ms。

defaultMQPullConsumerImpl: 默认Pull消费者的具体实现。

consumerGroup: 消费者组名字。

brokerSuspendMaxTimeMillis: 在长轮询模式下,Broker的最大挂起请求时间,建议不要修改此值。

consumerTimeoutMillisWhenSuspend: 在长轮询模式下,消费者的最大请求超时时间,必须比brokerSuspendMaxTimeMillis大,不建议修改。

consumerPullTimeoutMillis: 消费者Pull消息时Socket的超时时间。

messageModel: 消费模式,现在支持集群模式消费和广播模式消费。

messageQueueListener: 消息路由信息变化时回调处理监听器,一般在重新平衡时会被调用。

offsetStore: 位点存储模块。集群模式位点会持久化到Broker中,广播模式持久化到本地文件中,位点存储模块有两个实现类:RemoteBrokerOffsetStore 和LocalFileOffsetStore。

allocateMessageQueueStrategy: 消费Queue分配策略管理器。

maxReconsumeTimes: 最大重试次数,可以配置。

核心方法

registerMessageQueueListener():注册队列变化监听器,当队列发生变化时会被监听到。

pull():从Broker中Pull消息,如果有PullCallback参数,则表示异步拉取。

pullBlockIfNotFound():长轮询方式拉取。如果没有拉取到消息,那么Broker会将请求Hold住一段时间。

updateConsumeOffset ( final MessageQueue mq , final long offset):更新某一个Queue的消费位点。

fetchConsumeOffset(final MessageQueue mq,final boolean fromStore):查找某个Queue的消费位点。

sendMessageBack(MessageExt msg,int delayLevel,String brokerName,String consumerGroup):如果消费发送失败,则可以将消息重新发回给 Broker,这个消费者组延迟一段时间后可以再消费(也就是重试)。

fetchSubscribeMessageQueues(final String topic):获取一个Topic的全部Queue信息。

DefaultMQPushConsumer
DefaultMQPushConsumer的大部分属性、方法和DefaultMQPullConsumer是一样的。下面介绍一下DefaultMQPushConsumer的核心属性和方法

package org.apache.rocketmq.common.consumer;

/**
 * 一个枚举,表示从什么位点开始消费。
 */
public enum ConsumeFromWhere {
    //从上次消费的位点开始消费,相当于断点继续
    CONSUME_FROM_LAST_OFFSET,

    //4.2.0不支持,处理同 CONSUME_FROM_LAST_OFFSET
    @Deprecated
    CONSUME_FROM_LAST_OFFSET_AND_FROM_MIN_WHEN_BOOT_FIRST,
    //4.2.0 不支持,处理同 CONSUME_FROM_LAST_OFFSET
    @Deprecated
    CONSUME_FROM_MIN_OFFSET,
    //4.2.0 不支持,处理同 CONSUME_FROM_LAST_OFFSET
    @Deprecated
    CONSUME_FROM_MAX_OFFSET,
    //从ConsumeQueue的最小位点开始消费
    CONSUME_FROM_FIRST_OFFSET,
    //从指定时间开始消费
    CONSUME_FROM_TIMESTAMP,
}

public class DefaultMQPushConsumer extends ClientConfig implements MQPushConsumer {

    private final InternalLogger log = ClientLogger.getLog();

    /**
     * 默认的Push消费者具体实现类
     */
    protected final transient DefaultMQPushConsumerImpl defaultMQPushConsumerImpl;

    /**
     * 消费者组名字
     */
    private String consumerGroup;

    /**
     * 消费模式,现在支持集群模式消费和广播模式消费
     */
    private MessageModel messageModel = MessageModel.CLUSTERING;

    /**
     * 一个枚举,表示从什么位点开始消费
     */
    private ConsumeFromWhere consumeFromWhere = ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET;

    /**
     * 表示从哪一时刻开始消费,格式为*yyyyMMDDHHmmss,默认为半小时前。
     * 当*consumeFromWhere=CONSUME_FROM_TIMESTAMP时,consumeTimestamp设*置的值才生效
     */
    private String consumeTimestamp = UtilAll.timeMillisToHumanString3(System.currentTimeMillis() - (1000 * 60 * 30));

    /**
     * 消费者订阅topic-queue策略
     */
    private AllocateMessageQueueStrategy allocateMessageQueueStrategy;

    /**
     * 订阅关系,表示当前消费者订阅了哪些Topic的哪些 Tag
     */
    private Map<String /* topic */, String /* sub expression */> subscription = new HashMap<String, String>();

    /**
     * 消息Push回调监听器
     */
    private MessageListener messageListener;

    /**
     * Offset Storage
     */
    private OffsetStore offsetStore;

    /**
     *  最小消费线程数,必须小于consumeThreadMax
     */
    private int consumeThreadMin = 20;

    /**
     * 最大线程数,必须大于consumeThreadMin
     */
    private int consumeThreadMax = 20;

    /**
     * 动态调整消费线程池的线程数大小,开源版本不支持该功能
     */
    private long adjustThreadPoolNumsThreshold = 100000;

    /**
     * 并发消息的最大位点差。如果Pull消息的位点差超过该值,拉取变慢
     */
    private int consumeConcurrentlyMaxSpan = 2000;

    /**
     * 一个 Queue 能缓存的最大消息数。超过该值则采取拉取流控措施。
     */
    private int pullThresholdForQueue = 1000;

    /**
     * 一个Queue最大能缓存的消息字节数,单位是MB
     */
    private int pullThresholdSizeForQueue = 100;

    /**
     * 一个Topic最大能缓存的消息数。超过该值则采取拉取流控措施。
     * 该字段默认值是-1,该值根据pullThresholdForQueue的配置决定是否生效,pullThresholdForTopic的优先级低于pullThresholdForQueue
     */
    private int pullThresholdForTopic = -1;

    /**
     * 一个Topic最大能缓存的消息字节数,单位是MB。
     * 默认为-1,结合 pullThresholdSizeForQueue 配置项生效,该配置项的优先级低于pullThresholdSizeForQueue
     */
    private int pullThresholdSizeForTopic = -1;

    /**
     * 拉取间隔,单位为ms
     */
    private long pullInterval = 0;

    /**
     * 消费者每次批量消费时,最多消费多少条消息
     */
    private int consumeMessageBatchMaxSize = 1;

    /**
     * 一次最多拉取多少条消息
     */
    private int pullBatchSize = 32;

    /**
     * 每次拉取消息时是否更新订阅关系,该方法的返回值默认为False
     */
    private boolean postSubscriptionWhenPull = false;

    /**
     * Whether the unit of subscription group
     */
    private boolean unitMode = false;

    /**
     * 最大重试次数,该函数返回值默认为-1,表示默认最大重试次数为16
     */
    private int maxReconsumeTimes = -1;

    /**
     * 为短轮询场景设置的挂起时间,比如顺序消息场景
     */
    private long suspendCurrentQueueTimeMillis = 1000;

    /**
     * 消费超时时间,单位为min,默认值为15min
     */
    private long consumeTimeout = 15;

    /**
     * Maximum time to await message consuming when shutdown consumer, 0 indicates no await.
     */
    private long awaitTerminationMillisWhenShutdown = 0;

    /**
     * Interface of asynchronous transfer data
     */
    private TraceDispatcher traceDispatcher = null;
}

业务代码通常使用构造函数初始化一个DefaultMQPullConsumer实例,设置各种参数,比如Namesrv地址、消费者组名等。然后调用start()方法启动defaultMQPullConsumerImpl实例。我们这里主要讲 defaultMQPullConsumerImpl.start()方法中的启动过程,具体步骤
image

第一步: 最初创建defaultMQPullConsumerImpl时的状态为 ServiceState.CREATE_JUST,然后设置消费者的默认启动状态为失败。
第二步: 检查消费者的配置比,如消费者组名、消费类型、Queue 分配策略等参数是否符合规范;将订阅关系数据发给Rebalance服务对象。
第三步: 校验消费者实例名,如果是默认的名字,则更改为当前的程序进程id。
第四步: 获取一个 MQClientInstance,如果 MQClientInstance已经初始化,则直接返回已初始化的实例。这是核心对象,每个clientId缓存一个实例。
第五步: 设置Rebalance对象消费者组、消费类型、Queue分配策略、MQClientInstance等参数。
第六步: 对 Broker API 的封装类 pullAPIWrapper进行初始化,同时注册消息,过滤filter。
第七步: 初始化位点管理器,并加载位点信息。位点管理器分为本地管理和远程管理两种,集群消费时消费位点保存在 Broker 中,由远程管理器管理;广播消费时位点存储在本地,由本地管理器管理。
第八步: 本地注册消费者实例,如果注册成功,则表示消费者启动成功。
第九步: 启动MQClientInstance实例

DefaultMQPushConsumer的启动过程与DefaultMQPullConsumer的启动过程类似,用户也是通过构造函数初始化,依次调用DefaultMQPushConsumer的start方法和其内部实现类DefaultMQPushConsumerImpl的start()方法,开启整个启动过程的。

DefaultMQPushConsumer的启动过程分为11个步骤。
image
第一~七步: 与DefaultMQPullConsumer的步骤类似
第八步: 初始化消费服务并启动。之所以用户“感觉”消息是Broker 主动推送给自己的,是因为DefaultMQPushConsumer通过Pull服务将消息拉取到本地,再通过Callback的 形 式,将本地消息Push给用户的消费代码。DefaultMQPushConsumer 与DefaultMQPullConsumer获取消息的方式一样,本质上都是拉取
第九步: 启动MQClientInstance实例。
第十步: 更新本地订阅关系和路由信息;通过 Broker 检查是否支持消费者的过滤类型;向集群中的所有Broker发送消费者组的心跳信息。
第十一步: 立即执行一次Rebalance

与RocketMQ - 消费者启动机制相似的内容:

RocketMQ - 消费者启动机制

RocketMQ客户端中有两个独立的消费者实现类:org.apache.rocketmq.client.consumer.DefaultMQPullConsumer 和 org.apache.rocketmq.client.consumer.DefaultMQPushConsumer Default

【RocketMQ】【源码】主从模式下的消费进度管理

在[【RocketMQ】消息的拉取](https://www.cnblogs.com/shanml/p/16513229.html)一文中可知,消费者在启动的时候,会创建消息拉取API对象`PullAPIWrapper`,调用pullKernelImpl方法向Broker发送拉取消息的请求,那么在主

RocketMQ - 消费者概述

消费流程 消费者组: 一个逻辑概念,在使用消费者时需要指定一个组名。一个消费者组可以订阅多个Topic。 消费者实例: 一个消费者组程序部署了多个进程,每个进程都可以称为一个消费者实例。 订阅关系: 一个消费者组订阅一个 Topic 的某一个 Tag,这种记录被称为订阅关系。RocketMQ规定消费

RocketMQ - 消费者Rebalance机制

客户端是通过Rebalance服务做到高可靠的。当发生Broker掉线、消费者实例掉线、Topic 扩容等各种突发情况时,消费者组中的消费者实例是怎么重平衡,以支持全部队列的正常消费的呢? RebalancePullImpl 和 RebalancePushImpl 两个重平衡实现类,分别被 Defa

RocketMQ - 消费者进度保存机制

RocketMQ设计了远程位点管理和本地位点管理两种位点管理方式。集群消费时,位点由客户端提交给Broker保存,具体实现代码在RemoteBrokerOffsetStore.java文件中;广播消费时,位点保存在消费者本地磁盘上,实现代码在LocalFileOffsetStore.java文件中

RocketMQ - 消费者消费方式

RocketMQ的消费方式包含Pull和Push两种 Pull方式:用户主动Pull消息,自主管理位点,可以灵活地掌控消费进度和消费速度,适合流计算、消费特别耗时等特殊的消费场景。缺点也显而易见,需要从代码层面精准地控制消费,对开发人员有一定要求。 在 RocketMQ 中org.apache.ro

RocketMQ消费者是如何负载均衡的

摘要:RocketMQ 支持两种消息模式:集群消费( Clustering )和广播消费( Broadcasting )。 本文分享自华为云社区《一文讲透RocketMQ消费者是如何负载均衡的》,作者:勇哥java实战分享。 RocketMQ 支持两种消息模式:集群消费( Clustering )和

RocketMQ - 生产者最佳实践总结

相对消费者而言,生产者的使用更加简单,一般关注消息类型、消息发送方法和发送参数,即可正常使用RocketMQ发送消息 常用消息类型 | 消息类型 | 优点 | 缺 点 | 备注 | | | | | | | 普通消息(并发消息) | 性能最好。单机TPS的级别为100 000 | 消息的生产和消费都无

【RocketMQ】Rebalance负载均衡总结

消费者负载均衡,是指为消费组下的每个消费者分配订阅主题下的消费队列,分配了消费队列消费者就可以知道去消费哪个消费队列上面的消息,这里针对集群模式,因为广播模式,所有的消息队列可以被消费组下的每个消费者消费不涉及负载均衡,而集群模式一个消息队列同一时间只能分配给组内的一个消费者进行消费。 Rocket

【RocketMQ】消息的消费总结

消费者从Broker拉取到消息之后,会将消息提交到线程池中进行消费,RocketMQ消息消费是批量进行的,如果一批消息的个数小于预先设置的批量消费大小,直接构建消费请求ConsumeRequest将消费请求提交到线程池处理,否则需要分批构建进行提交。 消息消费 在消息被提交到线程池后进行处理时,会调