RabbitMQ 进阶使用之延迟队列 → 订单在30分钟之内未支付则自动取消

rabbitmq · 浏览次数 : 17

小编点评

**实现代码** ```java @Test public void testSendMessage() { // 设置队列属性 properties.setProperty("x-message-ttl", "3000"); // 发送消息 rabbitTemplate.send(message); // 查看队列状况 System.out.println("Queue size: " + rabbitTemplate.getQueueSize("com.qsl.message.ttl.queue")); } ``` **测试结果** ``` Queue size: 1 ``` **说明** * 首先,设置了队列属性 `x-message-ttl` 为 3000 毫秒。 * 然后,发送了一个消息到 `com.qsl.message.ttl.queue` 中。 * 由于队列属性设置了死信队列,因此消息不会被发送到 `com.qsl.dlx.queue` 中。 * 因此,队列大小为 1,表示只有一个消息在队列中。

正文

开心一刻

晚上,媳妇和儿子躺在沙发上

儿子疑惑的问道:妈妈,你为什么不去上班

媳妇:妈妈的人生目标是前20年靠父母养,后40年靠你爸爸养,再往后20年就靠你和妹妹养

儿子:我可养不起

媳妇:为什么

儿子:因为,呃...,我和你的想法一样

于家村长_不可置信

讲在前面

如果你们对 RabbitMQ 感到陌生,那可以停止往下阅读了

请先去查阅相关资料,对它有一个基本的了解之后再接着阅读本文

本文会以循序渐进的方式来讲解标题:

使用 RabbitMQ 的延迟队列来实现:订单在30分钟之内未支付则自动取消

所以请你们耐心逐步往下看

另外,实现标题的方式有很多,但本文只讲其中之一的 延迟队列,至于其他方式,不在本文讲解范围之内,如果想了解,烦请你们自行去查阅

消息何去何从

RabbitMQ 的模型架构,相信你们都知道

架构

消息Producer 生成,经 Exchange 路由到 Queue ,然后推给 Consumer 进行消费

消费消息有两种方式

  1. 推模式(Basic.Consume)
  2. 拉模式(Basic.Get)

如果 消息Exchange 无法路由到符合条件的队列时,该 消息 该如何处理,是返还给 Producer 还是直接丢弃?

如果 消息 被路由到 Queue 时发现没有任何消费者,该 消息 该如何处理,是存在 Queue 中还是返还给 Producer ?

作为一个牛皮的中间件,一旦涉及到可选项了,应该怎么做?

我相信你们已经想到了,那肯定是增加配置参数来支持可选项嘛!

mandatory

mandatory 参数用于设置消息是否必须被路由到队列中,默认值是 false

mandatory 参数设置为 true 时,Exchange 无法根据自身的类型和路由键找到一个符合条件的 Queue,那么 RabbitMQ 会调用 Basic.Return 命令将消息返回给生产者。当 mandatory 参数设置为 false 时,出现上述情形,则消息直接被丢弃

mandatory 值为 false

mandaroty_false

代码执行正常,但没有输出结果,所以我们不确定消息是否投递了

但我们可以通过 RabbitMQ 管理界面,看 Exchange 概况

mandaroty_exchange

来确定消息确实投递了

mandatory 值为 true 时,需要添加一个监听器 ReturnListener

mandatory_true

代码执行正常,同时也有输出结果

2024-06-01 14:54:52|AMQP Connection 10.5.108.226:5672|com.qsl.rabbit.PriorityMessageTest|INFO|59|Basic.Return 返回结果:mandatory test

也可以通过 RabbitMQ 管理界面,看 Exchange 概况来确定消息是否投递过

作为拓展,给你们留两个问题

  1. mandatory 设置为 true 的同时,不添加监听器 ReturnListener,会是什么结果
  2. mandatory 设置为 false 的同时,添加监听器 ReturnListener,又会是什么结果

immediate

immediate 参数用于设置消息是否立即发送给消费者,默认值是 false

immediate 参数设置为 true 时,如果消息路由到队列时发现队列上并没有任何消费者,那么该消息不会存入队列中,当与路由键匹配的所有队列都没有消费者时,该消息会通过 Basic.Return 返回至生产者

immediate 为 true ,消息路由到匹配的队列时

  1. 部分队列有消费者,有消费者的队列会立即将消息投递给消费者,没有消费者的队列会丢弃该消息
  2. 全部队列都没有消费者,则将该消息返回给生产者

执行如下代码

immediate_true

你会发现报错

2024-06-01 16:16:06|AMQP Connection 10.5.108.226:5672|org.springframework.amqp.rabbit.connection.CachingConnectionFactory|ERROR|1575|Channel shutdown: connection error; protocol method: #method<connection.close>(reply-code=540, reply-text=NOT_IMPLEMENTED - immediate=true, class-id=60, method-id=40)

这是因为从 RabbitMQ 3.0 版本开始去掉了对 immediate 参数的支持,对此官方解释如下

immediate 参数会影响镜像队列的性能,增加了代码复杂性,建议采用 TTLDLX 替代 immediate

概括来讲,mandatory 针对的是消息能否路由到至少一个队列中,否则将消息返回给生产者。immediate 针对的是消息能否立即投递给消费者,否则将消息直接返回给生产者,不用将消息存入队列而等待消费者

Alternate Exchange

生产者在发送消息时,如果不设置 mandatory 参数(或设置为 false),那么消息在未被路由的情况下会丢失;如果设置了 mandatory(且设置成 true),那么需要添加对应的 ReturnListener 逻辑,生产者的代码会变得复杂。如果既不想增加生产者的复杂,又不想消息丢失,那么就可以使用备份交换器(Alternate Exchange),将未被路由的消息存储在 RabbitMQ 中,在需要的时候再去处理这些消息

实现代码如下

备份交换器实现

执行如下测试代码

alternate_测试代码

消息通过 com.qsl.normal.exchange ,经路由键 123 未匹配到任何队列,此时消息就会发送给 com.qsl.normal.exchange 的备份交换器 com.qsl.alternate.exchange,因为备份交换器的类型是 fanout,所以消息会被路由到 com.qsl.alternate.exchange 绑定的所有队列上,目前只有一个队列 com.qsl.unrouted.queue ,所以消息最终来到 com.qsl.unrouted.queue,消息流转如下

备用交换器

RabbitMQ 控制台看队列状况如下

alternate_测试结果

备份交换器和普通的交换器没有太大的区别,为了方便使用,推荐选择 fanout 类型;你们也可以选择其他类型,比如 directtopic,但此时需要保证消息被重新路由到备份交换器的路由键和生产者发出的路由键是一样的,否则消息不能正确路由到备份交换器的队列中,消息会丢失!

关于备份交换器,以下几种特殊情况需要注意

  • 如果设置的备份交换器不存在,客户端和 RabbitMQ 服务器都不会产生异常,此时消息丢失
  • 如果备份交换器没有绑定任何队列,客户端和 RabbitMQ 服务器都不会产生异常,此时消息丢失
  • 如果备份交换器没有任何匹配的队列,客户端和 RabbitMQ 服务器都不会产生异常,此时消息丢失
  • 如果备份交换器和 mandatory 参数一起使用,mandatory 会失效

过期时长(TTL)

TTL,Time to Live 的简称,字面意思生存时长,也有很多人称过期时间,个人更习惯称过期时长

消息的 TTL

RabbitMQ 有两种方法对消息设置过期时长

  1. 通过队列属性设置,队列中的所有消息都有相同的过期时长
  2. 对消息本身进行单独设置,每条消息的过期时长可以不同

如果两种方法一起使用,则消息的过期时长以两者之间较小值为准(而非单纯的以消息的过期时长为准)

消息在队列中的生存时间一旦超过设置的过期时长,就会变成 死信(Dead Message) ,消费者将无法再通过正常的路由收到该消息

可以通过绑定 死信队列 来消费 Dead Message

通过队列属性 x-message-ttl 可以设置消息的过期时长,单位是毫秒,示例代码如下

消息ttl队列

如果不设置 TTL,消息不会过期;如果 TTL 设置成 0,则表示除非此时可以将消息直接投递给消费者,否则该消息直接被丢弃,这个特性是不是看起来很眼熟?回过头去看看 immediatetrue 时的第 1 个特性

1.部分队列有消费者,有消费者的队列会立即将消息投递给消费者,没有消费者的队列会丢弃该消息

通过参数 expiration 可以单独设置每个消息的过期时长,单位也是毫秒,示例代码如下

消息ttl

这两种方法的过期策略是怎样的,大家思考下再往下看

对于设置队列属性 x-message-ttl 的方法,队列中的消息具有相同的过期时长,队列中已过期的消息肯定是在队列头部,RabbitMQ 只需要定期的从队头开始往队尾扫描,一旦消息过期则从队列中剔除,一旦扫描到 未过期 的消息,则本次扫描完成

对于设置参数 expiration 的方法,每个消息可以设置不同的过期时长,那么过期的消息不一定在队列头部,如果要删除队列中所有过期消息,只能扫描整个队列,此时的成本是比较高的,所以采用惰性删除,即消息即将被投递给消费者时做过期判定,如果过期则进行删除

如果既设置了队列属性 x-message-ttl,又设置了 expiration,那该如何判定消息是否过期了呢?

定期删除 + 惰性删除,Redis 的过期策略是不是也是这个?

队列的 TTL

这里针对的是队列,而非队列中的消息,大家别和 消息的 TTL 搞混了

通过参数 x-expires 可以设置队列被自动删除前处于未使用状态的时长,单位是毫秒,不能设置为 0

未使用状态需要满足三点

  1. 队列上没有任何消费者
  2. 队列也没有被重新声明
  3. 过期时间段内未调用过 Basic.Get 命令

RabbitMQ 能保证在过期时长到达后将队列删除,但不保障及时。RabbitMQ 重启后,持久化的队列的过期时长会被重新计算

如下是创建一个过期时长为 30 分钟的队列

ttl队列

队列信息如下

ttl队列 rabbitmq控制台

死信队列

死信队列 之前,我们得先了解 DLX,全称 Dead-Letter-Exchange,中文翻译成 死信交换器

当消息在一个队列中变成死信之后

消息变成死信的情况包括以下3种

  1. 消息被决绝(Basic.Reject/Basic.Nack),并设置参数 requeuefalse
  2. 消息过期
  3. 队列达到最大长度

它能被重新发送到另一个交换器中,这个交换器就是 DLX,而绑定到 DLX 的队列就是 死信队列

DLX 也是一个正常的交换器,和一般的交换器没有区别,它可以和任何队列进行绑定,当绑定的队列中存在死信时,RabbitMQ 就会自动将这个消息重新发布到设置的 DLX 上,进而被路由到 死信队列死信队列 也是可以被监听的,也可以有消费者对 死信队列 中的消息进行消费处理的

所以,死信队列 可以变相的实现 immediatetrue 时的第 2 种情况

2.全部队列都没有消费者,则将该消息返回给生产者

为什么是 变相,因为不是直接将消息返回给生产者,而是生产者可以监听 死信队列 ,使消息回到生产者;虽然结果一致,但实现方式还是有区别的

那么 immediatetrue 的特性,就可以用 TTL + 死信队列 来替代了

通过参数 x-dead-letter-exchange 可以给队列添加 DLX;通过参数 x-dead-letter-routing-key 可以给这个 DLX 指定路由键,如果未配置该参数,则使用原队列的路由键,实现代码如下

DLX实现

执行如下测试代码

DLX_Test

消息通过交换器 com.qsl.normal.exchange,经路由键 ttlMessage 匹配到队列 com.qsl.message.ttl.queue 中,队列设置了 x-message-ttl 为 3000 毫秒,这段时长内队列上没有消费者消费这条消息,消息过期。由于给队列设置了死信交换器 com.qsl.dlx.exchange,消息会通过该交换器,经路由键 dlx_routing_key 匹配到队列 com.qsl.dlx.queue 中,消息最终存储在该死信队列中,消息流转如下

RabbitMQ 进阶-死信队列

RabbitMQ 控制台,可以看到队列状况如下

死信队列状况

DLX 是一个非常有用的特性,它可以处理异常情况下,消息不能够被消费者正确消费而被置入到死信队列中,保证消息不被丢失;后续分析程序可以通过消费死信队列中的消息来分析当时所遇到的异常情况,进而改善和优化系统

DLX 还有一个很重要的功能,它配合 TTL 可以实现延迟队列,具体实现请往下看

延迟队列

延迟队列存储的对象是延迟消息

延迟消息 指的是需要延迟消费的消息

就是当消息发送之后,并不想让消费者立即拿到消息,而是等待特定时长后,消费者才拿到消息进行消费

延迟队列的使用场景有很多,例如:

  1. 订单系统中,下单完成之后 30 分钟内完成支付,否则取消订单
  2. 用户注册成功后,如果三天内没有登陆则进行短信提醒
  3. 远程控制扫地机器人,2 个小时后进行房间打扫
  4. ...

RabbitMQ 本身并没有直接支持 延迟队列 的功能,但是可以通过 DLXTTL 模拟出 延迟队列 的功能,具体实现已经在上一节(死信队列)中完成了,你们可以网上翻一翻

给大家演示 场景1 的完整示例,时间改成 1 分钟内完成支付

生产者端配置

order 配置

消费者端配置

order 消费者

消息发送

order 消息发送

输出日志如下

order 日志

实际应用中,可以根据延迟时长给延迟队列划分多个等级,例如

RabbitMQ 进阶-多维度死信队列

目前 RabbitMQ 提供了另外的方式来实现 延迟队列

https://github.com/rabbitmq/rabbitmq-delayed-message-exchange

感兴趣的可以去看看

总结

  1. 示例代码:spring-boot-rabbitmq

  2. mandatory 与 immediate

    mandatory 针对的是消息能否路由到至少一个队列中,否则将消息返回给生产者

    immediate 针对的是消息能否立即投递给消费者,否则将消息直接返回给生产者,不用将消息存入队列而等待消费者

    RabbitMQ 3.0 版本开始去掉了对 immediate 参数的支持,可以用 DLXTTL 来代替

  3. 过期时长

    消息的过期时长有两种设置方式:队列的参数 x-message-ttl 和消息的参数 expiration

    队列也可以设置过期时长,该时长内队列一直处于未使用状态则会被删除;通过队列参数 x-expires 来设置

  4. 死信队列

    绑定到死信交换器(DLX)上的队列就是死信队列

    DLX 能够保证异常的情况下消息不会丢失,后续通过分析死信队列中的消息,可以改善和优化系统

  5. 延迟队列

    目前来讲,实现延迟队列的方式有两种

    1. DLXTTL
    2. rabbitmq-delayed-message-exchange

参考

《RabbitMQ实战指南》

与RabbitMQ 进阶使用之延迟队列 → 订单在30分钟之内未支付则自动取消相似的内容:

RabbitMQ 进阶使用之延迟队列 → 订单在30分钟之内未支付则自动取消

开心一刻 晚上,媳妇和儿子躺在沙发上 儿子疑惑的问道:妈妈,你为什么不去上班 媳妇:妈妈的人生目标是前20年靠父母养,后40年靠你爸爸养,再往后20年就靠你和妹妹养 儿子:我可养不起 媳妇:为什么 儿子:因为,呃...,我和你的想法一样 讲在前面 如果你们对 RabbitMQ 感到陌生,那可以停止往

上周热点回顾(6.3-6.9)

热点随笔: · C#开源实用的工具类库,集成超过1000多种扩展方法 (追逐时光者)· RabbitMQ 进阶使用之延迟队列 → 订单在30分钟之内未支付则自动取消 (青石路)· .Net 中间件 - 新开源代码生成器 -ReZero (阿妮亚)· C#.Net筑基-String字符串超全总结 [深

在C#中使用RabbitMQ做个简单的发送邮件小项目

在C#中使用RabbitMQ做个简单的发送邮件小项目 前言 好久没有做项目了,这次做一个发送邮件的小项目。发邮件是一个比较耗时的操作,之前在我的个人博客里面回复评论和友链申请是会通过发送邮件来通知对方的,不过当时只是简单的进行了异步操作。 那么这次来使用RabbitMQ去统一发送邮件,我的想法是通过

[转帖]Redis进阶(发布订阅,PipeLine,持久化,内存淘汰)

目录 1、发布订阅 1.1 什么是发布订阅 1.2 客户端实例演示 1.3 Java API演示 1.4 Redis发布订阅和rabbitmq的区别 2、批量操作 2.1 普通模式与 PipeLine 模式 2.2 适用场景 2.3 源码解析 2.4 Pipelining的局限性 2.5 事务与 L

[转帖]RabbitMQ学习笔记04:Publish/Subscribe

参考资料:RabbitMQ tutorial - Publish/Subscribe — RabbitMQ 前言 在 RabbitMQ学习笔记03:Work Queues 中,每个进入队列中的消息只会被投递给一个消费者进程。而在这篇文章中,我们将会把一条消息同时投递给多个消费者进程。这种模式也叫做p

RabbitMQ保姆级教程最佳实践

一、消息队列介绍 1、消息队列概念 1、MQ全称为Message Queue,消息队列(MQ)是⼀种应⽤程序对应⽤程序的通信⽅法。 应⽤程序通过读写出⼊队列的消息(针对应⽤程序的数据)来通信,⽽⽆需专⽤连接来 链接它们。 2、消息传递指的是程序之间通过在消息中发送数据进⾏通信,⽽不是通过直接调⽤彼此

说说RabbitMQ延迟队列实现原理?

使用 RabbitMQ 和 RocketMQ 的人是幸运的,因为这两个 MQ 自身提供了延迟队列的实现,不像用 Kafka 的同学那么苦逼,还要自己实现延迟队列。当然,这都是题外话,今天咱们重点来聊聊 RabbitMQ 延迟队列的实现原理,以及 RabbitMQ 实现延迟队列的优缺点有哪些? 很多人

5分钟带你了解RabbitMQ的(普通/镜像)集群

通过本文我们深入了解了RabbitMQ的集群模式及其优缺点。无论是普通集群还是镜像集群,都有其适用的场景和局限性。普通集群利用Erlang语言的集群能力,但消息可靠性和高可用性方面存在一定挑战;而镜像集群通过主动消息同步提高了消息的可靠性和高可用性,但可能会占用大量网络带宽。因此,在选择集群方案时,...

RabbitMQ+redis+Redisson分布式锁+seata实现订单服务

引言 订单服务涉及许多方面,分布式事务,分布式锁,例如订单超时未支付要取消订单,订单如何防止重复提交,如何防止超卖、这里都会使用到。 开启分布式事务可以保证跨多个服务的数据操作的一致性和完整性, 使用分布式锁可以确保在同一时间只有一个操作能够成功执行,避免并发引起的问题。 订单流程(只展示重要的内容

RabbitMQ 3.7.9版本中,Create Channel超时的常见原因及排查方法

在RabbitMQ 3.7.9版本中,Create Channel超时的常见原因及排查方法如下: 常见原因 网络问题: 网络延迟或不稳定可能导致通信超时。 网络分区(network partition)可能导致部分节点无法访问。 资源限制: RabbitMQ服务器上的文件描述符或句柄数量限制。 服务