MQ系列1:消息中间件执行原理
MQ系列2:消息中间件的技术选型
MQ系列3:RocketMQ 架构分析
MQ系列4:NameServer 原理解析
MQ系列5:RocketMQ消息的发送模式
MQ系列6:消息的消费
MQ系列7:消息通信,追求极致性能
MQ系列8:数据存储,消息队列的高可用保障
MQ系列9:高可用架构分析
我们实际系统中有很多操作,不管你执行多少次,都应该产生一样的效果或返回一样的结果。 例如:
以上等等很多重要的场景,都需要幂等的特性来支持。
幂等(idempotent、idempotence)是一个数学与计算机学概念,常见于抽象代数中。 在编程中.一个幂等操作的特点是其任意多次执行所产生的影响均与一次执行的影响相同。幂等函数,或幂等方法,是指可以使用相同参数重复执行,并能获得相同结果的函数。这些函数不会影响系统状态,也不用担心重复执行会对系统造成改变。
例如,“getUserSex()和setRight()”函数就是一个幂等函数,包括数据库中的查询和删除也是一样的道理,它是天然幂等的。总之,幂等就是一个操作,不论执行多少次,产生的效果和返回的结果都是一样的 。
我们先来回顾下 Message Queue的构成,这边以RocketMQ为例子:
RocketMQ主要有四大核心组成部分:NameServer、Broker、Producer以及Consumer四部分。
可以看出,消息发送和消息消费两个步骤是有可能产生消息不幂等的问题。
为保证消息的正确性发送,超时重试、异常重试、消费完成确认机制等能力都是可以使用,并对业务产生影响的。
我们举个例子,如果你购买一件商品,用户付款完成之后,通过MQ消息的异步通知,告知下游服务出库和通知。如果消息通知出现了问题或者下游消息消费出现了问题,导致无法ACK,都有可能导致重复的出库和通知。
MQ消息生产部分,就是下图中的步骤1、步骤2、步骤3:
如果3 消息确认故障导致消息丢失,则消息生产端 MQ-Client Producer 超时后会重发消息,这时候可能就有重复消息,如何保证幂等呢?
因为消息重发也是MQ-Client Producer发起的,消息的处理是消息队列的服务MQ-Server处理的,MQ-Server将数据进行了持久化么,这时候我们可以设计一个唯一的 msgId,作为去重的依据,无论重发多少次,msgId都是一样的,然后在DB数据库中将这个msgId设置为unique key,不允许重复,他有如下特性:
使用这个 msgId,可以保证只有1条消息落地到数据库中,就保证了消息生产端的幂等。
MQ消息消费部分,就是下图中的步骤4、步骤5、步骤6:
★ 说明:以上步骤须做一致性保障
这边重灾区就是步骤5,如果因为故障导致消息丢失,消息队列服务 MQ-Server 在超时后会重发消息,这样 MQ-Client Producer/Consumer 就会重复收到消息。
因为消息重发是 消息队列服务 MQ-Server 发起的,MQ-Client Consumer 负责消息消费,消息重发必然会导致业务重复消费(比如重复发消息、重复出库)。所以一样的道理,必然使用msgId来做判断,如果存在库中就进行消费,然后精确删除库中的数据。如果数据库中不存在,就忽略,避免重复消费。
同样的,这个msgID的特性如下:
这种方式最常见应用在:商品下单、消费支付、帖子点赞和留言等。
无论是何种消息队列,造成重复消费原因其实都是类似的。正常情况下,消费者在消费消息时候,消费完毕后,会发送一个确认信息给消息队列,消息队列就知道该消息被消费了,就会将该消息从消息队列中删除。
只是不同的消息队列发送的确认信息形式不同,例如RabbitMQ是发送一个ACK确认消息,RocketMQ是返回一个CONSUME_SUCCESS成功标志,kafka实际上有个offset的概念,每一个消息都有一个offset,kafka消费过消息后,需要提交offset,让消息队列知道自己已经消费过了。
那造成重复消费的原因? 就是因为网络传输等等故障,确认信息没有传送到消息队列,导致消息队列不知道自己已经消费过该消息了,再次将该消息分发给其他的消费者。
如何解决?这个问题针对业务场景来答分以下几点
(1)给这个消息做一个唯一主键,做数据库insert,如果出现重复消费情况,会导致主键冲突,避免数据库出现脏数据。
(2)update 和 delete 支持天然幂等性,拿到这个消息做redis的set的操作,那就容易了,不用解决,set操作天然幂等操作。
(3)第三方介质,来做消费记录。以redis为例,给消息分配一个全局id,只要消费过该消息,将<id,message>以K-V形式写入redis。那消费者开始消费前,先去redis中查询有没消费记录即可。