[转帖]RabbitMQ 消费者回执和发布确认

rabbitmq,消费者,回执,发布,确认 · 浏览次数 : 0

小编点评

**消费者回执(Consumer Acknowledgement)** 在 AMQP 0-9-1 中,消费者处理完消息后返回 acknowledgement,被称为消费者回执(Consumer Acknowledgement)。中间件收到生产者的消息后返回 acknowledgement,被称为发布确认(Publisher Confirm)。 **消费者回执** * 当 RabbitMQ 把消息投递给消费者时,它需要知道消费者是否成功接收并处理了消息。 * 消费者可以通过调用 `basic.ack` 方法来发送 positive ack,表明消息已成功接收并处理。 * 消费者可以通过调用 `basic.reject` 方法来发送 negative ack,表明消费者没有成功处理消息。 **发布确认** * 当生产者将消息发送出去之后,消息是否能成功地到达服务器呢? * 默认情况下,发送消息的操作是不会返回任何信息给生产者的,也就是说生产者根本不知道消息有没有成功到达 RabbitMQ 服务器。 * 为了解决这个问题,RabbitMQ 提供了一个改进方案——发布确认机制(Publisher Confirm)。 **事务机制** * 事务机制是一种隔离机制,它可以确保消息发送方和 RabbitMQ 之间消息确认的问题。 * 当消息成功被 RabbitMQ 接收时,事务提交成功,否则便在捕获异常之后进行事务回滚。 **发布确认机制** * 发布确认机制是异步的,可以是批量确认或异步确认。 * 在批量确认中,每发送一批消息后,调用 `channel.waitForConfirms` 方法,等待服务器的确认返回。 * 在异步确认中,当消息得到确认之后,客户端会回调这个方法进行处理。

正文

为了保证数据安全,消费者和生产者的回执(ack)都是非常重要的。

由于我们无法保证消息都能像我们期望的那样,正常到达另一端或者被 Consumer 消费成功。因此,publisher 和 consumer 都需要一种机制,来确保消息投递成功了和消息消费成功了。

在 AMQP 0-9-1 中,消费者处理完消息后返回 acknowledgement,被称为消费者回执(Consumer Acknowledgement)。中间件收到生产者的消息后返回 acknowledgement,被称为发布确认(Publisher Confirm)。

无论是把消息从生产者投递到 RabbitMQ 节点,还是把消息从 RabbitMQ 节点投递到消费者,为了保证消息投递的可靠性和安全性,Publisher Confirm 和 Consumer Acknowledgement 都是必需的。

消费者回执

当 RabbitMQ 把消息投递给消费者,它需要知道消费者是否成功接收并处理了消息。什么样的逻辑是最优的取决于你的系统,因此,它主要是由应用程序根据实际情况来决定的。

在 AMQP 0-9-1 中,消费者回执是在调用 basic.consume 方法或 basic.get 方法时实现的。

其实,在 RabbitMQ 入门指南(二)中提到的 Message acknowledgment 就是消费者回执。

Delivery Tag

Delivery Tag 就是 Message Delivery 的标识符,它是一个 64 位的长整型值。

当消费者注册成功后,RabbitMQ 将会调用 basic.deliver 方法,该方法会携带唯一的 delivery_tag,将消息发送给消费者。

delivery_tag 的作用域是进行通信的 channel(信道),因此,消息回执必须在相同的 channel 中进行。否则,会导致 unknown delivery tag 异常。

回执模式

根据使用的回执模式(acknowledgement mode),RabbitMQ 会考虑消息是否投递成功了。

如果没有开启手动回执功能(默认关闭),消息一旦发送出去,RabbitMQ 就认为消息投递成功了。

如果开启了手动回执功能,只有当 RabbitMQ 收到了消费者手动返回的 ack 时,才会认为消息投递成功了。

将 autoAck 参数设置为 false,即表示开启手动回执功能。 这时,RabbitMQ 队列中的消息分为了两个部分:

  • 一部分是等待发送给消费者的消息(Ready);
  • 另一部分是已经发送给消费者但还未收到消费者的 ack 信号的消息(Unacked)。

手动发送的回执,可以是 positive(乐观的),也可以是 negative(悲观的)。你可以使用下面的方法来手动发送回执。

  • basic.ack 用于发送 positive ack,明确表示消费者成功地处理了消息,消息会被 RabbitMQ 删除。
  • basic.reject 用于发送 negative ack,明确表示消费者没有成功处理消息,即拒绝消息。其中,requeue 参数如果为 false,RabbitMQ 会删除该消息;如果为 true,RabbitMQ 会让该消息重新进入队列,以便重新投递给消费者。basic.reject 每次只能拒绝一条消息。
  • basic.nack 和 basic.reject 的功能类似,但 basic.nack 支持 multiple 参数,它可以一次拒绝多条消息。

手动发送回执通常会结合 prefetch(预读取)功能一起使用。

注意: 如果将 requeue 设置为 false,可以启用【死信队列】的功能。死信队列可以通过检测被拒绝或未送达的消息来追踪和处理问题。

发布确认

当生产者将消息发送出去之后,消息是否能成功地到达服务器呢?

默认情况下,发送消息的操作是不会返回任何信息给生产者的,也就是说生产者根本不知道消息有没有成功到达 RabbitMQ 服务器。那么,消息在到达服务器之前,就可能出现丢失的问题。

为了解决这个问题,RabbitMQ 提供了两种解决方案:

  • 事务机制。
  • 发布确认机制。

事务机制

与事务机制相关的方法有三个:

  • channel.txSelect 用于将当前的信道设置成事务模式,开启事务。
  • channel.txCommit 用于提交事务。
  • channel.txRollback 用于回滚事务。

在开启事务之后,便可以发布消息给 RabbitMQ,如果事务提交成功,则消息一定到达了 RabbitMQ 中,如果在事务提交之前由于 RabbitMQ 异常崩溃或者其他原因抛出异常,这时我们便可以捕获异常,并执行 channel.txRollback 方法回滚事务。

注意: RabbitMQ 中的事务机制与 MySQL 中的事务概念并不相同。

事务确实能够解决消息发送方和 RabbitMQ 之间消息确认的问题,只有当消息成功被 RabbitMQ 接收,事务才能提交成功,否则便可在捕获异常之后进行事务回滚,同时可以重发消息。

但是,使用事务机制会严重降低 RabbitMQ 的消息吞吐量。为此,RabbitMQ 提供了一个改进方案——发布确认机制(生产者确认机制)。

发布确认机制

发布确认机制(Publisher Confirm):生产者将信道设置成 confirm 模式,所有在该信道上发布的消息都会被指定一个唯一的 ID(从 1 开始),一旦消息被发送给所匹配的队列,RabbitMQ 就会发送一个确认(Basic.Ack)给生产者(包含消息的唯一 ID),这样,生产者便知道消息已经正确到达了 RabbitMQ 服务器。

如果消息和队列是持久化的,那么,Basic.Ack 会在消息写入磁盘之后发出。

事务机制在一条消息发送之后会阻塞发送端,需同步等待 RabbitMQ 的回应,收到回应后才能继续发送下一条消息。相比之下,发布确认机制的优点在于它可以是异步的,一旦发布一条消息,生产者就可以在等待信道返回确认的同时继续发送下一条消息,当消息得到确认之后,生产者便可以通过回调方法来处理该确认消息(Basic.Ack),如果 RabbitMQ 因为自身内部错误导致消息丢失,就会发送一条 nack(Basic.Nack)命令,生产者同样可以在回调方法中处理该 nack 命令。

生产者通过调用 channel.confirmSelect 方法,将信道设置为confirm 模式,之后 RabbitMQ 会返回 Confirm.Select-Ok 命令表示同意生产者将当前信道设置为 confirm 模式。后续该信道中发送的消息都会被 RabbitMQ 服务器 ack 或 nack 一次。

生产者通过调用 channel.waitForConfirms 方法判断是否收到 了RabbitMQ 服务器 的 ack 或 nack。

注意事项:

  • 事务机制和发布确认机制是互斥的,不能在同一个信道中共存。
  • 事务机制和发布确认机制确保的都是把消息成功地发送给 RabbitMQ 的交换器。如果此交换器没有匹配的队列,那么消息也会丢失。发送方可以配合 mandatory 参数或者备份交换器一起使用来提高消息传输的可靠性。
  • 发布确认机制的优势在于它并不一定需要同步确认。

发布确认机制可以采用以下两种方式来提升 confirm 的效率。

  • 批量 confirm:每发送一批消息后,调用channel.waitForConfirms 方法,等待服务器的确认返回。相比普通的 confirm,批量 confirm 极大地提升了confirm 的效率,但问题在于出现 Basic.Nack 或者超时情况时,客户端需要将这一批次的消息全部重发,这会带来明显的重复消息数量,并且当消息经常丢失时,批量 confirm 的性能或许是不升反降的。
  • 异步 confirm:提供一个回调方法,服务端确认了一条或多条消息后,客户端会回调这个方法进行处理。

异步 confirm

异步 confirm 的逻辑实现比较复杂。

Channel 中提供了 addConfirmListener 方法,可以添加 ConfirmListener 这个回调接口。

ConfirmListener 接口包含两个方法:handleAck 和 handleNack,分别用来处理 RabbitMQ 回传的 Basic.Ack 和 Basic.Nack。

这两个方法中都包含 delivery_tag 参数(用来标记消息的唯一有序序号) 。我们需要为每一个信道维护一个 unconfirm 的消息序号集合,每发送一条消息,集合中的元素就加 1。每当调用 ConfirmListener 中的 handleAck 方法时,unconfirm 集合就删掉相应的一条(multiple 设置为 false)或者多条(multiple 设置为 true)记录。 unconfirm 集合最好采用有序集合 SortedSet 的存储结构。

</article>

与[转帖]RabbitMQ 消费者回执和发布确认 相似的内容:

[转帖]RabbitMQ 消费者回执和发布确认

为了保证数据安全,消费者和生产者的回执(ack)都是非常重要的。 由于我们无法保证消息都能像我们期望的那样,正常到达另一端或者被 Consumer 消费成功。因此,publisher 和 consumer 都需要一种机制,来确保消息投递成功了和消息消费成功了。 在 AMQP 0-9-1 中,消费者处

[转帖]RabbitMQ学习笔记03:Work Queues

参考资料:RabbitMQ tutorial - Work Queues — RabbitMQ 前言 这篇文章我们会创建一个Work Queue,它会在多个worker(即消费者 consumer)中分发耗时的任务。Work Queue也叫做Task Queue是为了避免当处理一个占用资源的任务时必

[转帖]RabbitMQ高可用性

RabbitMQ消息应答 执行一个任务可能需要花费几秒钟,你可能会担心如果一个消费者在执行任务过程中挂掉了。一旦RabbitMQ将消息分发给了消费者,就会从内存中删除。在这种情况下,如果正在执行任务的消费者宕机,会丢失正在处理的消息和分发给这个消费者但尚未处理的消息。 但是,我们不想丢失任何任务,如

[转帖]RabbitMQ学习笔记04:Publish/Subscribe

参考资料:RabbitMQ tutorial - Publish/Subscribe — RabbitMQ 前言 在 RabbitMQ学习笔记03:Work Queues 中,每个进入队列中的消息只会被投递给一个消费者进程。而在这篇文章中,我们将会把一条消息同时投递给多个消费者进程。这种模式也叫做p

[转帖]RabbitMQ学习笔记05:Routing

参考资料:RabbitMQ tutorial - Routing — RabbitMQ 前言 在之前的文章中我们构建了一个简单的日志系统,它可以广播消息到多个消费者中。 在这篇文章中,我们打算实现仅订阅消息的子集(即不是所有的消息,仅仅只是一部分消息。注意,这里不是说一条消息的一部分)。例如我们只会

[转帖]如何选择RabbitMQ的消息保存方式?

https://www.cnblogs.com/zhengchunyuan/p/10179677.html RabbitMQ对于queue中的message的保存方式有两种方式:disc和ram。如果采用disc,则需要对exchange/queue/delivery mode都要设置成durabl

[转帖]RabbitMQ 如何保证交换机中的消息不丢失

我们知道,生产者会先将消息发送给交换机,但是如果交换机此时没有匹配到相关的队列时,交换机中的消息就会出现丢失的问题。 那么,如何保证交换机中的消息不丢失呢? mandatory 参数 当 basicPublish 方法的 mandatory 参数设为 true 时,如果交换器无法匹配到绑定的队列,那

[转帖]RabbitMQ 的重要概念(术语)

Message 消息指的是 RabbitMQ 的队列中保存的数据。 Producer 消息的生产者,即 message publisher(sender),是指负责创建和发送消息的程序。 Vhost RabbitMQ 的虚拟主机,一个 broker 里可以开设多个 vhost,用作不同用户的权限分离

[转帖]RabbitMQ基础概念详细介绍

https://www.jianshu.com/p/e55e971aebd8 AMQP简介 AMQP,即 Advanced Message Queuing Protocol,高级消息队列协议,是应用层协议的一个开放标准,为面向消息的中间件设计。消息中间件主要用于组件之间的解耦和通讯。 AMQP的主要

[转帖]RabbitMQ服务优化,修改最大连接数

https://www.cnblogs.com/hoyeong/p/16242202.html RabbitMQ的优化RabbitMQ的连接数是压垮消息队列的一个重要的指标。所以在平时使用OpenStack平台的过程中,如果大量的用户同时创建虚拟机,会导致云平台创建报错,其实就是消息队列服务的崩溃。