消费者从Broker拉取到消息之后,会将消息提交到线程池中进行消费,RocketMQ消息消费是批量进行的,如果一批消息的个数小于预先设置的批量消费大小,直接构建消费请求ConsumeRequest
将消费请求提交到线程池处理,否则需要分批构建进行提交。
在消息被提交到线程池后进行处理时,会调用消息监听器的consumeMessage
进行消息消费,它返回消息的消费结果状态,状态有两种分别为CONSUME_SUCCESS
和RECONSUME_LATER
:
在消息消费完毕之后,会根据consumeMessage
方法返回的结果状态进行处理,对ackIndex的值进行设置,ackIndex的值用于在下一步中处理消费失败的消息。
前面可知消费结果状态有以下两种:
消费的总消息个数 - 1
,表示消息都消费成功。广播模式
广播模式下,如果消息消费失败,只将失败的消息打印出来不做其他处理。
集群模式
开启for循环,初始值为i = ackIndex + 1
,结束条件为i < consumeRequest.getMsgs().size()
,上面可知ackIndex有两种情况:
消费成功:ackIndex值为消息大小-1,此时ackIndex + 1的值等于消息的个数大小,不满足for循环的执行条件,相当于消息都消费成功,不需要进行失败的消息处理。
延迟消费:ackIndex值为-1,此时ackIndex+1为0,满足for循环的执行条件,从第一条消息开始遍历到最后一条消息,向Broker发送CONSUMER_SEND_MSG_BACK
请求,如果发送成功Broker会根据延迟等级,放入不同的延迟队列中,到达延迟时间后,消费者将会重新进行拉取,如果发送失败,消费次数加1,并加入到失败消息列表中,稍后重新提交到消息消费线程池进行处理。
延迟级别
RocketMQ的延迟级别对应的延迟时间常量定义如下:
public class MessageStoreConfig {
private String messageDelayLevel = "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h";
}
延迟级别与延迟时间对应关系:
延迟级别0 ---> 对应延迟时间1s,也就是延迟1秒后消费者重新从Broker拉取进行消费
延迟级别1 ---> 延迟时间5s
延迟级别2 ---> 延迟时间10s
...
以此类推,最大的延迟时间为2h。
在向Broker发送CONSUMER_SEND_MSG_BACK请求的时候,会从上下文中获取设置的延迟级别(默认为0,也就是延迟1s),然后设置以下信息,向Broker发送请求:
CONSUMER_SEND_MSG_BACK
;Broker对CONSUMER_SEND_MSG_BACK
类型的请求处理逻辑如下:
MessageExtBrokerInner
,设置消息的相关信息,此时相当于生成了一个全新的消息(会设置之前消息的ID),重新添加到CommitLog中,消息主题的设置有两种情况:
asyncPutMessage
存储消息;asyncPutMessage
方法中,会对延迟级别进行判断,如果延迟时间级别大于0,说明消息需要延迟消费,此时做如下处理:
SCHEDULE_TOPIC_XXXX
;SCHEDULE_TOPIC_XXXX
主题下,会根据延迟级别创建对应的消息队列,所以这一步会根据消息的延迟级别投递到对应的队列中;RMQ_SYS_SCHEDULE_TOPIC
;总结
消费者在消息消费失败的时候,会向Broker发送CONSUMER_SEND_MSG_BACK
请求,在请求处理中会判断消息的消费次数是否大于最大的消费次数,如果超过最大消费次数,会将消息投递到死信队列中。
如果未达到最大的消费次数,会重新生成一条消息,使用重试主题(%RETRY% + 消费组名称),从中随机选取一个队列稍后进行投递,同时也会设置对应的延迟级别,设置延迟级别之后,在消息存储之前会对延迟级别进行判断,如果需要延迟消费,会使用RocketMQ默认创建的SCHEDULE_TOPIC_XXXX
主题,先根据延迟级别将消息投递到对应的延迟队列中,然后由一个定时任务去检测这个主题下的消息,当消息到达延迟的时间后,再将消息取出投递到原本主题下的消息队列中,在这里就是之前设置的重试主题下的队列,也就是将消息投递到重试队列中,之后的流程就与普通消息的存储一致,将消息存入CommitLog中,再创建对应的ConsumeQueue数据,消费者就可以拉取到消息重新进行消费。
消费者在启动的时候,会处理订阅的Topic数据,如果是集群模式,会自动添加重试主题的订阅(%RETRY% + 消费组名称),然后就可以从重试主题中拉取到对应的重试消息进行消费。
以上步骤处理完毕后,首先调用removeMessage从处理队列中移除消息并返回拉取消息的偏移量,然后更新拉取偏移量,也就是消费进度。
RocketMQ消费模式分为广播模式和集群模式,广播模式下消费进度保存在每个消费者端,集群模式下消费进度保存在Broker端。
广播模式
广播模式对应的OffSetStore
实现类为LocalFileOffsetStore
,使用了一个ConcurrentMap类型的变量offsetTable
存储每个消息队列对应的拉取偏移量,KEY为消息队列,value为该消息队列对应的拉取偏移量。
在更新拉取进度的时候,对offsetTable
中的值进行更新,需要注意这里只是更新了offsetTable
中的数据,并没有持久化到磁盘。
集群模式
集群模式对应的实现类为RemoteBrokerOffsetStore
,更新进度与广播模式下的更新类似,都是只更新了offsetTable中的数据。
持久化的触发
消费者在启动的时候注册了定时任务,定时将消息拉取进度进行持久化,对于广播模式,将每个消息队列对应的拉取偏移量持久化到本地文件即可,对于集群模式,由于拉取进度保存在Broker端,所以需要向Broker发送请求进行持久化,在RocketMQ的存储目录中有一个对应的文件,叫consumerOffset.json,里面的offsetTable中保存了每个消息队列的消费进度,持久化时会将消费进度写入这个文件:
{
"offsetTable":{
"TestTopic@TestTopicGroup":{ // 主题名称@消费者组名称
0:0, // 每个消息队列对应的消费进度,Key中的0表示队列0,value中的0表示消息在ConsumeQueue中的逻辑偏移量
1:1,
2:1,
3:0
}
}
}
RocketMQ在进行消费的时候,不管是否消费成功,都会推进消费进度,对本次消费进度进行更新,对于消费失败的消息,会将其重新放入重试队列中消费,所以向后推进消费进度不会导致消费失败的消息无法重新消费。不过更新消费进度是先更新到内存中,持久化的操作是在定时任务中,所以有一定的概率,在定时任务执行之前,消费者突然宕机未能成功向Broker发送请求进行更新,重启消费者后,集群模式下会先从broker获取消费进度,而Broker中的数据未更新,导致消费者重复消费消息。
RocketMQ消息的消费相关源码可参考:【RocketMQ】【源码】消息的消费