RocketMQ - 消费者Rebalance机制

rocketmq,消费者,rebalance,机制 · 浏览次数 : 87

小编点评

**RebalanceImpl 的核心属性和方法** **核心属性:** * `log`: 记录消息队列和进程队列的关系。 * `processQueueTable`: 保存所有 Topic 和 MessageQueue 的映射。 * `topicSubscribeInfoTable`: 保存当前消费者组订阅了哪些 Topic 的哪些 Tag。 * `subscriptionInner`: 保存每个 Topic 的 SubscriptionData。 **核心方法:** * `lock(final MessageQueue mq)`: 加锁 MessageQueue。 * `doRebalance(final boolean isOrder)`: 执行 Rebalance 操作。 * `messageQueueChanged(final String topic, final Set<MessageQueue> mqAll, final Set<MessageQueue> mqDivided)`: 通知消息队列发生变化。 * `removeUnnecessaryMessageQueue(final MessageQueue mq, final ProcessQueue pq)`: 在 Rebalance 中移除不再需要的 MessageQueue。 * `dispatchPullRequest(final List<PullRequest> pullRequestList)`: 在 Rebalance 中处理拉取请求。 * `updateProcessQueueTableInRebalance(final String topic, final Set<MessageQueue> mqSet, final boolean isOrder)`: 在 Rebalance 中更新 processQueue。 **其他方法:** * `lock`: 确保 Rebalance 操作的线程安全。 * `doRebalance`: 执行 Rebalance 操作。 * `messageQueueChanged`: 通知消息队列发生变化。 * `removeUnnecessaryMessageQueue`: 在 Rebalance 中移除不再需要的 MessageQueue。 * `dispatchPullRequest`: 处理拉取请求。 * `updateProcessQueueTableInRebalance`: 更新 processQueue。

正文

客户端是通过Rebalance服务做到高可靠的。当发生Broker掉线、消费者实例掉线、Topic 扩容等各种突发情况时,消费者组中的消费者实例是怎么重平衡,以支持全部队列的正常消费的呢?

RebalancePullImpl 和 RebalancePushImpl 两个重平衡实现类,分别被 DefaultMQPullConsumer 和DefaultMQPushConsumer 使用。下面讲一下 Rebalancelmpl 的核心属性和方法
image

核心属性

public abstract class RebalanceImpl {
    protected static final InternalLogger log = ClientLogger.getLog();
    //记录MessageQueue和ProcessQueue的关系。MessageQueue可以简单地理解为ConsumeQueue的客户端实现;ProcessQueue是保存Pull消息的本地容器
    protected final ConcurrentMap<MessageQueue, ProcessQueue> processQueueTable = new ConcurrentHashMap<MessageQueue, ProcessQueue>(64);
    //Topic 路由信息 。保存 Topic 和 MessageQueue的关系。
    protected final ConcurrentMap<String/* topic */, Set<MessageQueue>> topicSubscribeInfoTable = new ConcurrentHashMap<String, Set<MessageQueue>>();
    //真正的订阅关系,保存当前消费者组订阅了哪些Topic的哪些Tag
    protected final ConcurrentMap<String /* topic */, SubscriptionData> subscriptionInner = new ConcurrentHashMap<String, SubscriptionData>();
    protected String consumerGroup;
    protected MessageModel messageModel;
    //消费分配策略的实现
    protected AllocateMessageQueueStrategy allocateMessageQueueStrategy;
    //client实例对象
    protected MQClientInstance mQClientFactory;
}

核心方法

public abstract class RebalanceImpl {
    //为MessageQueue加锁
    public boolean lock(final MessageQueue mq) {}
    //执行Rebalance操作
    public void doRebalance(final boolean isOrder) {}
    //通知Message发生变化,这个方法在Push和Pull两个类中被重写
    public abstract void messageQueueChanged(final String topic, final Set<MessageQueue> mqAll,
                                             final Set<MessageQueue> mqDivided);
    //去掉不再需要的 MessageQueue
    public abstract boolean removeUnnecessaryMessageQueue(final MessageQueue mq, final ProcessQueue pq);
    //执行消息拉取请求
    public abstract void dispatchPullRequest(final List<PullRequest> pullRequestList);
    //在Rebalance中更新processQueue
    private boolean updateProcessQueueTableInRebalance(final String topic, final Set<MessageQueue> mqSet,
                                                       final boolean isOrder)

}

Rebalancelmpl 、 RebalancePushImpl 、 RebalancePullImpl 是Rebalance的核心实现,主要逻辑都在Rebalancelmpl中,因为Pull消费者和Push消费者对Rebalance的需求不同,在各自的实现中重写了部分方法,以满足自身需求

如果有一个消费者实例下线了,Broker和其他消费者是怎么做Rebalance的呢
image

@Override
public void run() {
    log.info(this.getServiceName() + " service started");

    while (!this.isStopped()) {
        this.waitForRunning(waitInterval);
        this.mqClientFactory.doRebalance();
    }

    log.info(this.getServiceName() + " service end");
}

目前队列分配策略有以下5种实现方法

  • AllocateMessageQueueAveragely:平均分配,也是默认使用的策略(强烈推荐)。
  • AllocateMessageQueueAveragelyByCircle:环形分配策略。
  • AllocateMessageQueueByConfig:手动配置。
  • AllocateMessageQueueConsistentHash:一致性Hash分配。
  • AllocateMessageQueueByMachineRoom:机房分配策略

与RocketMQ - 消费者Rebalance机制相似的内容:

RocketMQ - 消费者Rebalance机制

客户端是通过Rebalance服务做到高可靠的。当发生Broker掉线、消费者实例掉线、Topic 扩容等各种突发情况时,消费者组中的消费者实例是怎么重平衡,以支持全部队列的正常消费的呢? RebalancePullImpl 和 RebalancePushImpl 两个重平衡实现类,分别被 Defa

【RocketMQ】Rebalance负载均衡总结

消费者负载均衡,是指为消费组下的每个消费者分配订阅主题下的消费队列,分配了消费队列消费者就可以知道去消费哪个消费队列上面的消息,这里针对集群模式,因为广播模式,所有的消息队列可以被消费组下的每个消费者消费不涉及负载均衡,而集群模式一个消息队列同一时间只能分配给组内的一个消费者进行消费。 Rocket

RocketMQ - 消费者概述

消费流程 消费者组: 一个逻辑概念,在使用消费者时需要指定一个组名。一个消费者组可以订阅多个Topic。 消费者实例: 一个消费者组程序部署了多个进程,每个进程都可以称为一个消费者实例。 订阅关系: 一个消费者组订阅一个 Topic 的某一个 Tag,这种记录被称为订阅关系。RocketMQ规定消费

RocketMQ - 消费者启动机制

RocketMQ客户端中有两个独立的消费者实现类:org.apache.rocketmq.client.consumer.DefaultMQPullConsumer 和 org.apache.rocketmq.client.consumer.DefaultMQPushConsumer Default

RocketMQ - 消费者进度保存机制

RocketMQ设计了远程位点管理和本地位点管理两种位点管理方式。集群消费时,位点由客户端提交给Broker保存,具体实现代码在RemoteBrokerOffsetStore.java文件中;广播消费时,位点保存在消费者本地磁盘上,实现代码在LocalFileOffsetStore.java文件中

RocketMQ - 消费者消费方式

RocketMQ的消费方式包含Pull和Push两种 Pull方式:用户主动Pull消息,自主管理位点,可以灵活地掌控消费进度和消费速度,适合流计算、消费特别耗时等特殊的消费场景。缺点也显而易见,需要从代码层面精准地控制消费,对开发人员有一定要求。 在 RocketMQ 中org.apache.ro

RocketMQ消费者是如何负载均衡的

摘要:RocketMQ 支持两种消息模式:集群消费( Clustering )和广播消费( Broadcasting )。 本文分享自华为云社区《一文讲透RocketMQ消费者是如何负载均衡的》,作者:勇哥java实战分享。 RocketMQ 支持两种消息模式:集群消费( Clustering )和

RocketMQ - 生产者最佳实践总结

相对消费者而言,生产者的使用更加简单,一般关注消息类型、消息发送方法和发送参数,即可正常使用RocketMQ发送消息 常用消息类型 | 消息类型 | 优点 | 缺 点 | 备注 | | | | | | | 普通消息(并发消息) | 性能最好。单机TPS的级别为100 000 | 消息的生产和消费都无

【RocketMQ】消息的消费总结

消费者从Broker拉取到消息之后,会将消息提交到线程池中进行消费,RocketMQ消息消费是批量进行的,如果一批消息的个数小于预先设置的批量消费大小,直接构建消费请求ConsumeRequest将消费请求提交到线程池处理,否则需要分批构建进行提交。 消息消费 在消息被提交到线程池后进行处理时,会调

【RocketMQ】【源码】负载均衡源码分析

RocketMQ在集群模式下,同一个消费组内,一个消息队列同一时间只能分配给组内的某一个消费者,也就是一条消息只能被组内的一个消费者进行消费,为了合理的对消息队列进行分配,于是就有了负载均衡。 ![img](https://img2022.cnblogs.com/blog/2612945/20220