可重入锁思想,设计MQ迁移方案

mq · 浏览次数 : 0

小编点评

本文将探讨在一个需要从 Kafka 切换到 RocketMQ 的场景中,如何处理消息的幂等性和重复消费问题。由于 RocketMQ 不支持消息幂等性,我们需要自定义实现这一功能。 ### 1. 场景问题 在某些场景下,我们需要将原本使用 Kafka 的消息处理流程迁移到 RocketMQ,同时保证消息的幂等性和避免重复消费。由于 RocketMQ 不提供内置的幂等性保障,我们需要自行实现这一机制。 ### 2. 场景思考 - **顺序性和临界状态**:在分布式架构中,我们需要考虑生产者和消费者的顺序性和临界状态。 - **MQ消费的不确定性和重试**:由于生产者发送消息到 Kafka 或 RocketMQ 后,消费者拉取消费时可能不是顺序的,且网络状况可能导致消息重复消费。因此,需要设计可靠的重试机制。 - **分布式锁的使用**:为了避免同一消息被多个实例重复消费,我们需要使用分布式锁来确保同一消息在同一时间只被一个实例消费。 ### 3. 方案设计 #### 3.1 使用分布式锁 在消费端,我们需要使用分布式锁来确保同一消息只被一个实例消费。这里我们可以使用 Redis 或 ZooKeeper 实现分布式锁。 示例代码: ```python import redis lock_key = "my_lock" lock_value = "my_lock_value" # 尝试获取锁 lock = redis.Lock(lock_key, timeout=10) if lock.acquire(): try: # 解决重复消费的问题 pass finally: lock.release() ``` #### 3.2 消息幂等性设计 为了保证消息的幂等性,我们需要在消息 ID 上添加一个全局唯一的 ID。这个 ID 可以是消息发送时的时间戳,加上随机数,或者使用其他方式生成。 示例代码: ```python import time import uuid def generate_message_id(): return str(uuid.uuid4()) + str(int(time.time() * 1000)) message_id = generate_message_id() ``` #### 3.3 切量开关与消费逻辑 - **切量开关**:我们可以通过配置文件定义一个切量函数,根据一定的策略决定哪些消息应该被消费。 - **消费逻辑**:在消费端,我们需要检查锁是否已经被当前实例占用。如果不是,则尝试获取锁;如果是,则表示其他实例已经在处理该消息,我们需要等待或重试。 ### 4. 测试与优化 在完成上述设计后,我们需要对新的消息处理流程进行充分的测试,以确保其正确性和稳定性。在测试过程中,我们还需要关注性能和资源消耗等方面的问题。 ### 5. 总结 通过本文的分析和讨论,我们提出了一种在迁移 MQ 消息处理流程时,如何保证消息幂等性和避免重复消费的方法。这种方法结合了分布式锁和消息幂等性的设计,可以为我们的项目带来更好的可扩展性和稳定性。

正文

image

如果你的MQ消息要从Kafka切换到RocketMQ且不停机,怎么做?在让这个MQ消息调用第三方发奖接口,但无幂等字段又怎么处理?今天小傅哥就给大家分享一个关于MQ消息在这样的场景中的处理手段。

这是一种比较特例的场景,需要保证切换的MQ消息不被两端同时消费,并且还需要在一段消费失败后的MQ还可以继续重试。并且这一端消费的MQ消息,也要保证自身的幂等。

我们知道一般通用场景下,MQ消息都会有一个业务唯一ID值,用于接收方做仿重处理。但除此之外还应该有一个MQ消息本身的ID,这个ID也要全局唯一,每一条消息都要有一个ID,这是因为MQ是可能重复发送的(发送MQ成功,但获取MQ发送结果响应超时或更新库表消息状态失败,则重复发送),如果没有消息的唯一ID也就没法确保是哪一条消息了。

这个ID可以用于;唯一标识、去重、链路追踪、幂等性、事务以及安装性等,但可能有些伙伴在做MQ消息发送的时候,是容易忽略而没有在MQ中添加这个ID,或者随意用时间戳来当ID用,这样都是不合理的。会影响一些场景的代码健壮性设计。

需求背景描述好了,接下来,我们看看这样的场景怎么设计。

1. 场景问题

将原本使用 Kafka 的MQ方式,迁移到 RocketMQ,同时部分场景的 MQ 消息调用三方接口是没有幂等字段的,需要做好程序兼容处理。

2. 场景思考

首先我们要知道在分布式架构下,我们每做的技术方案都要考虑顺序性和临界状态。像是MQ的生产和消费都是多套应用实例部署的,那么生产端发送出来的MQ消息到不同的队列中也是有延迟和存放顺序以及拉取消费不同的情况。如;生产端发送MQ为A、B、C、D,但到Kafka/RocketMQ以及不同的消费端拉取时,不一定是A、B、C、D的顺序,那么直接做切量开关,是可能导致一个A消息在Kafka队列中消费完,点击切换开关(一种切量哈希计算手段,如消息{A}哈希值最后两位当做百分比用),正好RocketMQ也会把A消费掉。这样同一个消息就被重复消费了。

3. 方案设计

在整个方案设计中,我们要考虑几个非常重要的点。如图:

image

  • 一个是切换的两端MQ消费是抢占式加锁,避免重复消费。这是因为切量开关,切换过程中,两个消息队列中的MQ并不是顺序可靠的,可能存在重复消费,所以要加分布式锁。

  • 一段MQ消费失败要进行重试,但这个时候不能在消费失败后删分布式锁,因为MQ消费都是很快的,可能导致删锁后另外一端MQ进行了相同的消费。那可能有些伙伴会说,那也没关系呀,反正失败的这段没有消费成功。当往往失败并不一定是直接的结果失败,可能是网络失败,可能是超时失败等。也就是实际成功了,但超时反馈了。所以不能被其他端重复消费,并且要保证自己这一端消费失败后可重试。所以这块要设计可重入锁,也就是 setnx 加锁的值,为自身一段的 mq 类型,这样自己在接收mq消息以后,检查锁为自身加锁值可重试。这样也就保证了一端消费重试,不会让另外一端把MQ也跟着消费掉,因为setnx存在,并且有加锁值判断,所以不能进入。

  • 另外MQ消息还可能存在同一个MQ发送多次的场景,这个是非常正常的。比如,你再发送MQ的时候,超时网络抖动失败(1万次会有1次),那么就会补偿重发。但这个MQ已经发送过了,所以会接收2条MQ消息。那么在消费的时候,不能让2个MQ消息都进入消费中,因为多台实例消费,可能都去调用发奖了。那么这里还需要给MQ的ID进行幂等加锁。确保一个MQ消息,失败后,顺序轮训重试。也就保证了,发奖的过程中不会出现超发奖品。大部分三方接口还是有幂等字段的,有的话会更好。

  • 另外还有2个开关,一个是消费开关,一个是切量开关。消费开关要在整个新的MQ改造工程工程全部上线后开启,但还要被切量开关限定消费。开启后,切量开关才会生效。切量是一种哈希值的百分比比对,比如一个哈希值最后两位是10,那么切量配置小于等于10%则这个MQ则可以被切量后消费,另外一段则不消费这个MQ。

  • 另外,为了方便测试线上功能,还会加入白名单。不过大部分时候这类东西会用通用组件能力解决。

这样的场景方案设计,是非常值得积累的,同类的思想也可以帮我们解决很多共性问题。

与可重入锁思想,设计MQ迁移方案相似的内容:

可重入锁思想,设计MQ迁移方案

如果你的MQ消息要从Kafka切换到RocketMQ且不停机,怎么做?在让这个MQ消息调用第三方发奖接口,但无幂等字段又怎么处理?今天小傅哥就给大家分享一个关于MQ消息在这样的场景中的处理手段。 这是一种比较特例的场景,需要保证切换的MQ消息不被两端同时消费,并且还需要在一段消费失败后的MQ还可以继

从源码入手详解ReentrantLock,一个比synchronized更强大的可重入锁

写在开头 随手一翻,发现对于Java中并发多线程的学习已经发布了十几篇博客了,多线程 是Java基础中的重中之重!因此,可能还需要十几篇博客才能大致的讲完这部分的知识点,初学者对于这部分内容一定要多花心思,不可马虎!今天我们继续来学习一个重要知识点:ReentrantLock ReentrantLo

第125篇: 期约Promise基本特性

好家伙,本篇为《JS高级程序设计》第十章“期约与异步函数”学习笔记 1.非重入期约 1.1.可重入代码(百度百科) 先来了解一个概念 可重入代码(Reentry code)也叫纯代码(Pure code)是一种允许多个进程同时访问的代码。 为了使各进程所执行的代码完全相同,故不允许任何进程对其进行修

[转帖]ELF文件详解

一、ELF概述 1、ELF的定义 ELF(Executable and Linkable Format)文件是一种目标文件格式,常见的ELF格式文件包括:可执行文件、可重定位文件(.o)、共享目标文件(.so)、核心转储文件等。 ELF主要用于Linux平台,Windows下是PE/COFF格式。

C静态库的创建与使用--为什么要引入静态库?

C源程序需要经过预处理、编译、汇编几个阶段,得到各自源文件对应的可重定位目标文件,可重定位目标文件就是各个源文件的二进制机器代码,一般是.o格式。比如:util1.c、util2.c及main.c三个C源文件,经过预处理器、编译器、汇编器的处理,就可以得到各自的目标文件util1.o,util2.o

[转帖]010 Linux 文本统计与去重 (wc 和 uniq)

https://my.oschina.net/u/3113381/blog/5427461 wc 命令一般是作为组合命令的一员与其他命令一同起到统计的作用。而一般情况下使用 wc -l 命令较多。 uniq 可检查文本文件中重复出现的行,一般与 sort 命令结合使用。一起组合搭配使用完成统计、排序

我的第一个项目(五):(前后端)注册用户名查重

好家伙, bug终究还是来了,而且是很离谱的bug 来吧,发现问题,再解决问题 1.注册无法检测到用户名重复 也就是说一个用户名可无限注册, 来看bug(。。。) (看来是后端验证逻辑出了问题) 要是这么上线估计直接寄了 2.完成注册用户名查重 大概率是后端出了问题 这里我们先去看看后端,从后端去改

[转帖]使用 TiUP 部署运维 TiDB 线上集群

https://docs.pingcap.com/zh/tidb/stable/tiup-cluster 本文重在介绍如何使用 TiUP 的 cluster 组件,如果需要线上部署的完整步骤,可参考使用 TiUP 部署 TiDB 集群。 与 playground 组件用于部署本地测试集群类似,clu

[转帖]网络编程之长连接 、短连接、心跳机制与断线重连

https://cloud.tencent.com/developer/article/1953996?areaSource=104001.94&traceId=7WZNP412yK3vh7ebw4th0 概述 可承遇到,不知什么原因,一个夜晚,机房中,大片的远程调用连接断开。 第二天早上,用户访问

大数据 - DWS层 业务实现

统计主题 需求指标【ADS】输出方式计算来源来源层级 访客【DWS】pv可视化大屏page_log 直接可求dwd UV(DAU)可视化大屏需要用 page_log 过滤去重dwm UJ 跳出率可视化大屏需要通过 page_log 行为判断dwm 进入页面数可视化大屏需要识别开始访问标识dwd 连续