RocketMQ - 生产者启动流程

rocketmq,生产者,启动,流程 · 浏览次数 : 136

小编点评

**MQClientInstance类核心方法** **更新Topic Route InfoFromNameServer方法** 从多个NameServer中获取最新Topic路由信息,更新本地缓存。 **清理已经下线的Broker方法** 清理已经下线的Broker,检查其地址是否有效。 **检查Client是否在Broker中有效方法** 在本地注册一个消费者,检查其是否在Broker中有效。 **发送客户端的心跳信息给所有的Broker方法** 每个消费者组的消费者,需要在本地注册一个生产者,并发送给该生产者的心跳信息。 **在本地注册一个消费者方法** 同步方法 `registerConsumer()`,消费者可以注册一个消费者,并同步该消费者的状态。 **取消本地注册的消费者方法** 同步方法 `unregisterConsumer()`,消费者可以取消其注册的消费者。 **在本地注册一个生产者方法** 同步方法 `registerProducer()`,生产者可以注册一个生产者,并同步该生产者的状态。 **取消本地注册的生产者方法** 同步方法 `unregisterProducer()`,生产者可以取消其注册的生产者。 **注册一个管理实例方法** `registerAdminExt()`方法,立即执行一次 Rebalance。 **在本地缓存中查找Slave Broker信息方法** `findBrokerAddressInSubscribe()`方法,获取一个订阅关系中每个队列的消费进度。 **在本地缓存中查找Master Broker地址方法** `findBrokerAddressInPublish()`方法,获取一个发布关系中每个队列的消费进度。 **查找消费者id列表方法** `findConsumerIdList()`方法,通过Topic名字查找Broker地址。 **通过Topic名字查找Broker地址方法** `findBrokerAddrByTopic()`方法,通过Topic名字查找Broker地址。 **重置消费位点方法** `resetOffset()`方法,重置消费者对某个Topic的消费位点。 **获取消费者的消费统计信息方法** `consumerRunningInfo()`方法,获取消费者的消费统计信息。

正文

生产者启动流程

image
DefaultMQProducer是RocketMQ中默认的生产者实现

核心属性:

namesrvAddr: 继承自 ClientConfig,表示 RocketMQ 集群的Namesrv 地址,如果是多个则用分号分开。比如:127.0.0.1:9876;127.0.0.2:9876。

clientIP: 使用的客户端程序所在机器的 IP地址。支持 IPv4和IPv6,IPv4 排除了本地的环回地址(127.0.xxx.xxx)和私有内网地址(192.168.xxx.xxx)。这里需要注意的是,如果 Client 运行在Docker 容器中,获取的 IP 地址是容器所在的 IP 地址,而非宿主机的IP地址。

instanceName: 实例名,每个实例都需要取唯一的名字,因为有时我们会在同一个机器上部署多个程序进程,如果名字有重复就会导致启动失败。

vipChannelEnabled: 这是一个 boolean 值,表示是否开启 VIP通道。VIP 通道和非VIP通道的区别是:在通信过程中使用的端口号不同。

clientCallbackExecutorThreads: 客户端回调线程数。该参数表示 Netty 通信层回调线程的个数 ,默认值 Runtime.getRuntime().availableProcessors()表示当前CPU的有效个数。

pollNameServerInterval: 获取 Topic 路由信息的间隔时长,单位为 ms,默认为30 000ms。

heartbeatBrokerInterval: 与Broker心跳间隔的时长,单位为 ms,默认为30 000ms。

defaultMQProducerImpl: 默认生产者的实现类,其中封装了Broker的各种API(启动及关闭生产者的接口)。如果你想自己实现一个生产者,可以添加一个新的实现,保持DefaultMQProducer对外接口不变,用户完全没有感知。

producerGroup: 生产者组名,这是一个必须传递的参数。RocketMQ-way表示同一个生产者组中的生产者实例行为需要一致。

sendMsgTimeout: 发送超时时间,单位为ms。

compressMsgBodyOverHowmuch: 消息体的容量上限,超过该上限时消息体会通过ZIP进行压缩,该值默认为4MB。

retryTimesWhenSendFailed: 同步发送失败后重试的次数。默认为2次,也就是说,一共有3次发送机会。

retryTimesWhenSendAsyncFailed: 异步发送失败后重试的次数。默认为2次。异步重试是有条件的重试,并不是每次发送失败后都重试 。 源代码可以查看 org.apache.rocketmq.client.impl.MQClientAPIImpl.sendMessageAsync()方法 。 每次发送失败抛出异常后 , 通过执行onExceptionImpl()方法来决定什么场景进行重试

核心方法

start():这是启动整个生产者实例的入口,主要负责校验生产者的配置参数是否正确,并启动通信通道、各种定时计划任务、Pull服务、Rebalance服务、注册生产者到Broker等操作。

shutdown(): 关闭本地已注册的生产者,关闭已注册到Broker的客户端。

fetchPublishMessageQueues(Topic): 获取一个Topic有哪些Queue。在发送消息、Pull消息时都需要调用。

send(Message msg): 同步发送普通消息。

send(Message msg,long timeout): 同步发送普通消息(超时设置)。

send(Message msg,SendCallback sendCallback): 异步发送普通消息。

send(Message msg , SendCallback sendCallback , long timeout): 异步发送普通消息(超时设置)。

sendOneway(Message msg): 发送单向消息。只负责发送消息,不管发送结果。

send(Message msg,MessageQueue mq): 同步向指定队列发送消息。

send(Message msg,MessageQueue mq,long timeout): 同步向指定队列发送消息(超时设置)。同步向指定队列发送消息时,如果只有一个发送线程,在发送到某个指定队列中时,这个指定队列中的消息是有顺序的,那么就按照发送
时间排序;如果某个Topic的队列都是这种情况,那么我们称该Topic的全部消息是分区有序的。

send(Message msg , MessageQueue mq , SendCallback sendCallback): 异步发送消息到指定队列。

send(Message msg , MessageQueue mq , SendCallback sendCallback,long timeout): 异步发送消息到指定队列(超时设置)。

send(Message msg,MessageQueueSelector selector,Object arg,SendCallback sendCallback): 自定义消息发送到指定队列。通过实现MessageQueueSelector接口来选择将消息发送到哪个队列。

send(Collection<Message>msgs): 批量发送消息。

核心管理接口

createTopic(String key , String newTopic , int queueNum): 创建Topic。
viewMessage(String offsetMsgId): 根据消息id查询消息内容。

启动流程

生产者启动的流程比消费者启动的流程更加简单,一般用户使用DefaultMQProducer的构造函数构造一个生产者实例,并设置各种参数。比如Namesrv地址、生产者组名等,调用start()方法启动生产者实例,start()方法调用了生产者默认实现类的start()方法启动,这里我们主要讲实现类的start()方法内部是怎么实现的,
image
第一步: 通过 switch-case 判断当前生产者的服务状态,创建时默认状态是CREATE_JUST。设置默认启动状态为启动失败。
第二步: 执行 DefaultMQProducerImpl.checkConfig()方法。校验生产者实例设置的各种参数。比如生产者组名是否为空、是否满足命名规则、长度是否满足等。
第三步: 执行 DefaultMQProducer.changeInstanceNameToPID()方法。校验instancename,如果是默认名字则将其修改为进程id。
第四步: 执行 MQClientManager.getOrCreateMQClientInstance()方法。根据生产者组名获取或者初始化一个

MQClientInstance实例与 clientId是一一对应的,而clientId是由clientIP、instanceName 及 unitName 构成的。因此,为了减少客户端的使用资源,如果将所有的 instanceName和 unitName设置为同样的值,就会只创建一个 MQClientInstance实例

ClientConfig.java

public String buildMQClientId() {
    StringBuilder sb = new StringBuilder();
    sb.append(this.getClientIP());
    sb.append("@");
    sb.append(this.getInstanceName());
    if (!UtilAll.isBlank(this.unitName)) {
        sb.append("@");
        sb.append(this.unitName);
    }
    if (enableStreamRequestType) {
        sb.append("@");
        sb.append(RequestType.STREAM);
    }
    return sb.toString();
}

MQClientInstance 实例的功能是管理本实例中全部生产者与消费者的生产和消费行为 。
org.apache.rocketmq.client.impl.factory.MQClientInstance

类的核心属性

public class MQClientInstance { 
    ...
    private final ClientConfig clientConfig;
    private final int instanceIndex;
    private final String clientId;
    private final long bootTimestamp = System.currentTimeMillis();
    //当前client实例的全部生产者的内部实例。
    private final ConcurrentMap<String/* group */, MQProducerInner> producerTable = new ConcurrentHashMap<String, MQProducerInner>();
    //当前client实例的全部消费者的内部实例。
    private final ConcurrentMap<String/* group */, MQConsumerInner> consumerTable = new ConcurrentHashMap<String, MQConsumerInner>();
    //当前client实例的全部管理实例。
    private final ConcurrentMap<String/* group */, MQAdminExtInner> adminExtTable = new ConcurrentHashMap<String, MQAdminExtInner>();
    private final NettyClientConfig nettyClientConfig;
    //其实每个client也是一个Netty Server,也会支持Broker访问,这里实现了全部client支持的接口
    private final MQClientAPIImpl mQClientAPIImpl;
    //管理接口的本地实现类
    private final MQAdminImpl mQAdminImpl;
    //当前生产者、消费者中全部Topic的本地缓存路由信息
    private final ConcurrentMap<String/* Topic */, TopicRouteData> topicRouteTable = new ConcurrentHashMap<String, TopicRouteData>();
    private final Lock lockNamesrv = new ReentrantLock();
    private final Lock lockHeartbeat = new ReentrantLock();
    private final ConcurrentMap<String/* Broker Name */, HashMap<Long/* brokerId */, String/* address */>> brokerAddrTable =
        new ConcurrentHashMap<String, HashMap<Long, String>>();
    private final ConcurrentMap<String/* Broker Name */, HashMap<String/* address */, Integer>> brokerVersionTable =
        new ConcurrentHashMap<String, HashMap<String, Integer>>();
    //本地定时任务,比如定期获取当前 Namesrv 地址、定期同步Namesrv信息、定期更新Topic路由信息、定期发送心跳信息给Broker、定期清理已下线的Broker、
    //定期持久化消费位点、定期调整消费线程数(这部分源代码被官方删除了)
    private final ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(new ThreadFactory() {
        @Override
        public Thread newThread(Runnable r) {
            return new Thread(r, "MQClientFactoryScheduledThread");
        }
    });
    //请求的处理器,从处理方法 processRequest() 中我们可以知道目前支持哪些功能接口
    private final ClientRemotingProcessor clientRemotingProcessor;
    //Pull服务
    private final PullMessageService pullMessageService;
    //重新平衡服务。定期执行重新平衡方法this.mqClientFactory.doRebalance()。
    //这里的 mqClientFactory 就是 MQClientInstance 实例,通过依次调用MQClientInstance中保存
    //的消费者实例的doRebalance()方法,来感知订阅关系的变化、集群变化等,以达到重新平衡。
    private final RebalanceService rebalanceService;
    private final DefaultMQProducer defaultMQProducer;
    //消费监控 。 比如拉取 RT(Response Time,响应时间)、拉取TPS(Transactions Per Second,每秒处理消息数)、消费RT等都可以统计。
    private final ConsumerStatsManager consumerStatsManager;
 }

类的核心方法

public class MQClientInstance {  
    //从多个Namesrv中获取最新Topic路由信息,更新本地缓存
    public void updateTopicRouteInfoFromNameServer(){}
    //清理已经下线的Broker
    private void cleanOfflineBroker(){}
    //检查Client是否在Broker中有效
    private void checkClientInBroker(){}
    //发送客户端的心跳信息给所有的Broker。
    public void sendHeartbeatToAllBrokerWithLock() {}
    //在本地注册一个消费者。
    public synchronized boolean registerConsumer(final String group, final MQConsumerInner consumer) {}
    //取消本地注册的消费者。
    public synchronized void unregisterConsumer(final String group) {}
    //在本地注册一个生产者。
    public synchronized boolean registerProducer(final String group, final DefaultMQProducerImpl producer) {}
    //取消本地注册的生产者。
    public synchronized void unregisterProducer(final String group){}
    //注册一个管理实例。
    public boolean registerAdminExt(final String group, final MQAdminExtInner admin)
    //立即执行一次 Rebalance。该操作是通过 RocketMQ 的一个CountDownLatch2锁来实现的。
    public void rebalanceImmediately(){}
    //对于所有已经注册的消费者实例 ,执行一次Rebalance。
    public void doRebalance(){}
    //在本地缓存中查找Slave Broker信息
    public FindBrokerResult findBrokerAddressInSubscribe(
        final String brokerName,
        final long brokerId,
        final boolean onlyThisBroker
    ){}
    //在本地缓存中查找Master Broker地址。
    public String findBrokerAddressInPublish(final String brokerName){}
    //查找消费者id列表。
    public List<String> findConsumerIdList(final String topic, final String group) {}
    //通过Topic名字查找Broker地址。
    public String findBrokerAddrByTopic(final String topic) {}
    //重置消费位点。
    public synchronized void resetOffset(String topic, String group, Map<MessageQueue, Long> offsetTable){}
    //获取一个订阅关系中每个队列的消费进度。
    public Map<MessageQueue, Long> getConsumerStatus(String topic, String group){}
    //获取本地缓存Topic路由。
    public ConcurrentMap<String, TopicRouteData> getTopicRouteTable(){}
    //直接将消息发送给指定的消费者消费,和正常投递不同的是,指定了已经订阅的消费者组中的一个,
    //而不是全部已经订阅的消费者。一般适用于在消费消息后,某一个消费者组想再消费一次的场景。    
    public ConsumeMessageDirectlyResult consumeMessageDirectly(final MessageExt msg,
        final String consumerGroup,
        final String brokerName) {}
    //获取消费者的消费统计信息。包含消费RT、消费TPS等。
    public ConsumerRunningInfo consumerRunningInfo(final String consumerGroup){}
 } 

《RocketMQ分布式消息中间件:核心原理与最佳实践_李伟 (作者) _电子工业出版社 (2020年7月1日)》

与RocketMQ - 生产者启动流程相似的内容:

RocketMQ - 生产者启动流程

生产者启动流程 DefaultMQProducer是RocketMQ中默认的生产者实现 核心属性: namesrvAddr: 继承自 ClientConfig,表示 RocketMQ 集群的Namesrv 地址,如果是多个则用分号分开。比如:127.0.0.1:9876;127.0.0.2:9876

RocketMQ - 生产者原理

https://rocketmq.apache.org/ Apache RocketMQ是一款开源的、分布式的消息投递与流数据平台。出生自阿里巴巴,在阿里巴巴内部经历了3个版本后,作为Apache 顶级开源项目之一直到现在。在GitHub上有10000+star、5000+fork、170+cont

RocketMQ - 生产者消息发送流程

RocketMQ客户端的消息发送通常分为以下3层 业务层:通常指直接调用RocketMQ Client发送API的业务代码。 消息处理层:指RocketMQ Client获取业务发送的消息对象后,一系列的参数检查、消息发送准备、参数包装等操作。 通信层:指RocketMQ基于Netty封装的一个RP

RocketMQ - 生产者最佳实践总结

相对消费者而言,生产者的使用更加简单,一般关注消息类型、消息发送方法和发送参数,即可正常使用RocketMQ发送消息 常用消息类型 | 消息类型 | 优点 | 缺 点 | 备注 | | | | | | | 普通消息(并发消息) | 性能最好。单机TPS的级别为100 000 | 消息的生产和消费都无

【RocketMQ】RocketMQ存储结构设计

CommitLog 生产者向Broker发送的消息,会以顺序写的方式,写入CommitLog文件,CommitLog文件的根目录由配置参数storePathRootDir决定,默认每一个CommitLog的文件大小为1G,如果文件写满会新建一个CommitLog文件,以该文件中第一条消息的偏移量为文

【RocketMQ】消息的存储

当Broker收到生产者的消息发送请求时,会对请求进行处理,从请求中解析发送的消息数据,接下来以单个消息的接收为例,看一下消息的接收过程。 数据校验 封装消息 首先Broker会创建一个MessageExtBrokerInner对象封装从请求中解析到的消息数据,它会将Topic信息、队列ID、消息内

【RocketMQ】消息的拉取总结

在上一讲中,介绍了消息的存储,生产者向Broker发送消息之后,数据会写入到CommitLog中,这一讲,就来看一下消费者是如何从Broker拉取消息的。 RocketMQ消息的消费以组为单位,有两种消费模式: 广播模式:同一个消息队列可以分配给组内的每个消费者,每条消息可以被组内的消费者进行消费。

【RocketMQ】【源码】顺序消息实现原理

**全局有序** 在RocketMQ中,如果使消息全局有序,可以为Topic设置一个消息队列,使用一个生产者单线程发送数据,消费者端也使用单线程进行消费,从而保证消息的全局有序,但是这种方式效率低,一般不使用。 ![](https://img2022.cnblogs.com/blog/2612945

【RocketMQ】顺序消息实现总结

全局有序 在RocketMQ中,如果使消息全局有序,可以为Topic设置一个消息队列,使用一个生产者单线程发送数据,消费者端也使用单线程进行消费,从而保证消息的全局有序,但是这种方式效率低,一般不使用。 局部有序 假设一个Topic分配了两个消息队列,生产者在发送消息的时候,可以对消息设置一个路由I

[转帖]RocketMQ - nameSrv和Broker

RocketMQ RocketMQ是一个统一的消息传递引擎,轻量级的数据处理平台。 Name Server Name Server充当路由消息的提供者,生产者(Producer)或消费者(Customer)可以通过Name Server查找各主题对应的Broker IP列表,多个Name Serve