[转帖]RabbitMQ学习笔记03:Work Queues

rabbitmq,学习,笔记,work,queues · 浏览次数 : 0

小编点评

## RabbitMQ 消息持久化和轮询 **代码摘要** 该代码模拟多个消费者进程,通过 basic_qos方法加上参数prefetch_count=1使得空闲的消费者进程可以收到消息,即使按照轮询的法则这条消息可能不是其。 **主要功能** * 消息持久化:通过使用 pika.spec.PERSISTENT_DELIVERY_MODE 参数设置,确保消息和队列可以持久化。 * 轮询:使用 channel.start_consuming()方法启动多个消费者进程,每个进程可以接收并处理消息。 * 消息确认机制:通过 channel.basic_ack()方法确保当消费者进程发生意外的时候,消息不会因此丢失从而“无人处理”。 * 消息和队列的持久化:通过设置 pika.spec.PERSISTENT_DELIVERY_MODE 参数设置,确保消息和队列可以持久化。 **详细代码** ```python # worker.py import pika import sys import time # 连接到 RabbitMQ 服务 connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost')) # 创建通道 channel = connection.channel() # 创建队列 channel.queue_declare(queue='task_queue', durable=True) # 打印消息 print(' [*] Waiting for messages. To exit press CTRL+C') # 定义回调函数 def callback(ch, method, properties, body): print("\ [x] Received %r" % body.decode()) time.sleep(body.count(b'.')) print("\ [x] Done") ch.basic_ack(delivery_tag=method.delivery_tag) # 启动消费者进程 channel.basic_consume(queue='task_queue', on_message_callback=callback) # 启动服务 connection.start_consuming() # 新的消费者进程 new_task.py#!/usr/bin/env python import pika sysconnection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost')) channel = connection.channel() channel.queue_declare(queue='task_queue', durable=True) message = ' '.join(sys.argv[1:]) or \"Hello World!\" channel.basic_publish(exchange='', routing_key='task_queue', body=message, properties=pika.BasicProperties( delivery_mode=pika.spec.PERSISTENT_DELIVERY_MODE, )) # 关闭连接 connection.close() ``` **总结** 该代码模拟多个消费者进程,通过 basic_qos方法加上参数prefetch_count=1使得空闲的消费者进程可以接收消息,即使按照轮询的法则这条消息可能不是其。

正文

 

 

参考资料:RabbitMQ tutorial - Work Queues — RabbitMQ 

 

 

前言

这篇文章我们会创建一个Work Queue,它会在多个worker(即消费者 consumer)中分发耗时的任务。Work Queue也叫做Task Queue是为了避免当处理一个占用资源的任务时必须等待它完成。相反,我们调度这个任务晚点再完成。我们将任务封装成消息送入队列中。worker进程会在后台工作直到最终完成这个任务。当运行许多worker的时候,这个任务会在它们之间共享。

这个概念在web应用程序中特别有用,这使得在一次短的HTTP请求窗口中处理复杂的任务成为了可能。

之前的任务我们发送给消息队列的消息是简单的Hello World!,这次我们会发送一些字符串来表示复杂任务。我们并没有一个真实的任务,比如需要resized的图片或者需要渲染的PDF文件,因此我们只能用time.sleep()函数来表示我们在处理一个复杂任务处于忙碌的状态。我们使用小数点来表示问题的复杂程度,小数点越多,问题越复杂,耗时越久。每个小数点表示耗时1秒钟。例如如果有一个任务是Hello...,那么这个任务就耗时3秒。

我们需要稍微修改一下send.py,允许从CLI发送任意的消息。这个程序会安排任务到我们的队列中,因此我们将其命名为new_task.py 

import sys

message = ' '.join(sys.argv[1:]) or "Hello World!"
channel.basic_publish(exchange='',
                      routing_key='hello',
                      body=message)
print(" [x] Sent %r" % message)

这里我们贴一下当前的new_task.py完整代码。

#!/usr/bin/env python
import pika
import sys

connection = pika.BlockingConnection(
    pika.ConnectionParameters(host='localhost'))
channel = connection.channel()

channel.queue_declare(queue='hello')

message = ' '.join(sys.argv[1:]) or "Hello World!"

channel.basic_publish(exchange='',
                      routing_key='hello',
                      body=message)
print(" [x] Sent %r" % message)
connection.close()

receive.py同样需要修改:它需要根据小数点的数量来模拟复杂程序的处理。它会从队列中弹出消息并处理任务。我们将其命名为worker.py

import time

def callback(ch, method, properties, body):
    print(" [x] Received %r" % body.decode())
    time.sleep(body.count(b'.'))
    print(" [x] Done")

这里我们贴一下当前的worker.py完整代码。

#!/usr/bin/env python
import pika, sys, os
import time

def main():
    connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
    channel = connection.channel()

    channel.queue_declare(queue='hello')

    def callback(ch, method, properties, body):
        print(" [x] Received %r" % body.decode())
        time.sleep(body.count(b'.'))
        print(" [x] Done")

    channel.basic_consume(queue='hello', on_message_callback=callback, auto_ack=True)

    print(' [*] Waiting for messages. To exit press CTRL+C')
    channel.start_consuming()

if __name__ == '__main__':
    try:
        main()
    except KeyboardInterrupt:
        print('Interrupted')
        try:
            sys.exit(0)
        except SystemExit:
            os._exit(0)

 

 

Round-robin dispatching

使用Task Queue的一个优点就是实现简单的并行工作能力,如果我们在处理大量堆积工作的时候,我们可以通过简单地增加worker进程来扩展。

这里我们开启3个终端,前2个终端都运行python worker.py用来等待消息。

在第三个终端,我们输入如下。也就是说我们一共发送了5条消息。

[root@rabbitmq-01 code]# python new_task.py First message.
 [x] Sent 'First message.'
[root@rabbitmq-01 code]# python new_task.py Second message..
 [x] Sent 'Second message..'
[root@rabbitmq-01 code]# python new_task.py Third message...
 [x] Sent 'Third message...'
[root@rabbitmq-01 code]# python new_task.py Fourth message....
 [x] Sent 'Fourth message....'
[root@rabbitmq-01 code]# python new_task.py Fifth message.....
 [x] Sent 'Fifth message.....'

随后我们回到前两个终端查看结果。

第一个终端。

[root@rabbitmq-01 code]# python worker.py 
 [*] Waiting for messages. To exit press CTRL+C
 [x] Received 'First message.'
 [x] Done
 [x] Received 'Third message...'
 [x] Done
 [x] Received 'Fifth message.....'
 [x] Done
^CInterrupted

第二个终端。

[root@rabbitmq-01 code]# python worker.py 
 [*] Waiting for messages. To exit press CTRL+C
 [x] Received 'Second message..'
 [x] Done
 [x] Received 'Fourth message....'
 [x] Done
^CInterrupted

这5条消息会依次以轮询round-robin的方式被2个worker(消费者)处理掉。

woker.py中我们设置了time.sleep(),通过等待来模拟处理复杂任务,因此在前2个终端,小数点越多的消息我们等待其出现done的时长越久。

 

 

Message acknowledgment

某些任务可能需要执行几分钟或者更长的时间,如果我们在任务执行完毕之前就停止消费者的运行,会发生什么?

在我们目前的代码基础上,一旦RabbitMQ把消息投递给消费者,或者说一旦消费者从队列中消费掉消息,那么这条消息就会被标记为已删除(deletion)。如果消费者程序还在处理消息我们就终止了消费者程序,那么这条消息就会丢失。分配给消费者的消息但是没有被处理,那么这条消息也会丢失。

RabbitMQ 支持消息确认机制message acknowledgements),通过此机制消费者可以告诉RabbitMQ消息是否已经收到、处理,这样子RabbitMQ就可以放心地删除这条消息了。

如果消费者挂了(比如channel、connection关闭或者TCP连接丢失了)导致没有发送ack,RabbitMQ就会知道那条消息没有被完全的处理就会把它重新放回队列中。如果此时有其他的消费者连接着队列,那么这条消息就可以被其他的消费者处理掉。

消费者必须返回ack确认,否则就会超时。默认的超时时长是30分钟。这样就避免了消费者程序“卡住了”导致没有返回ack。ack超时时长的修改请参考Delivery Acknowledgement Timeout

手动的消息确认默认是启用的。在之前的代码中,我们通过auto_ack=True将它关闭了。现在我们要启用消息确认了。

修改callback函数和channel.basic_consume 

def callback(ch, method, properties, body):
    print(" [x] Received %r" % body.decode())
    time.sleep(body.count(b'.') )
    print(" [x] Done")
    ch.basic_ack(delivery_tag = method.delivery_tag)

channel.basic_consume(queue='hello', on_message_callback=callback)

参考上面的测试示例,我们打开2个终端,运行worker.py。随后在第三个终端执行一个10秒的长任务python new_task.py Long message.......... 

此时第一个终端的worker.py就会收到并处理消息,同时第二个终端依然处于等待消息的状态。随后我们在第一个终端终止worker.py

[root@rabbitmq-01 code]# python worker.py 
 [*] Waiting for messages. To exit press CTRL+C
 [x] Received 'Long message..........'
^CInterrupted

 此时第二个终端就会有消息进来了,证明我们的消息确认机制有正常工作,等待10秒,任务完毕。

[root@rabbitmq-01 code]# python worker.py 
 [*] Waiting for messages. To exit press CTRL+C
 [x] Received 'Long message..........'
 [x] Done

即使在消息抵达队列然后被消费者消费之后,我们关闭所有终端的worker.py。消息依然会存在于队列中,等待新的消费者连接到队列上。

消息的确认必须和消息送达消费者的channel一致,尝试不同的channel去确认消息会导致channel级别的异常发生。

Forgotten acknowledgment

我们很容易忘记使用basic_ack来确认消息,但是这个会导致严重的问题。如果我们忘记确认的话,当客户端程序(应该是指消费者)退出的时候,消息会被重新投递。但是RabbitMQ会吃掉越来越多的内存,因为它无法释放未确认的消息。

可以使用以下命令查看未确认的消息数量。

[root@rabbitmq-01 rabbitmq_server-3.11.5]# ./sbin/rabbitmqctl list_queues name messages_ready messages_unacknowledged
Timeout: 60.0 seconds ...
Listing queues for vhost / ...
name	messages_ready	messages_unacknowledged
hello	0	0

 

 

Message durability

通过消息的确认机制,我们已经学会如何确保当消费者程序出现问题的时候,消失不会丢失。但是如果RabbitMQ服务甚至服务器挂了的话,那么我们的消息还是会丢失的。

RabbitMQ挂了的时候,默认情况下,消息和队列都会被RabbitMQ 遗忘,除非我们让队列和消息都变成持久的(durable)

首先我们将队列声明为持久的。

channel.queue_declare(queue='hello', durable=True)

这句代码本身是没有问题的,但是由于我们的环境已经存在了一个名为hello的队列并且其是非持久的,此时我们再声明一个持久的同名队列,就会报错。因为RabbitMQ不支持同一个队列使用不同参数来声明。 

我们只需要换个新名称声明队列即可。

channel.queue_declare(queue='task_queue', durable=True)

因为生产者和消费者都有声明队列的代码,因此两边都需要修改。

worker.py 里面还要消费者消费队列的代码,也要修改队列名称。

现在,我们就可以确保当重启 RabbitMQ 服务或者服务器时,队列task_queue不会丢失了。

接下来我们需要确保消息是持久的。通过提供属性delivery_mode和值pika.spec.PERSISTENT_DELIVERY_MODE 

channel.basic_publish(exchange='',
                      routing_key="task_queue",
                      body=message,
                      properties=pika.BasicProperties(
                         delivery_mode = pika.spec.PERSISTENT_DELIVERY_MODE
                      ))

测试方式也很简单,代码修改好之后, 先push一条消息。

[root@rabbitmq-01 code]# python new_task.py Very Long message....................
 [x] Sent 'Very Long message....................'

查看队列。

[root@rabbitmq-01 rabbitmq_server-3.11.5]# ./sbin/rabbitmqctl list_queues
Timeout: 60.0 seconds ...
Listing queues for vhost / ...
name	messages
task_queue	1
hello	0

但是我们不启用worker.py来处理消息,我们直接停止服务或者重启服务器(记得确保开机启动)。

./sbin/rabbitmqctl stop
./sbin/rabbitmq-server -detached

再次查看,就会发现队列和消息都还在。

[root@rabbitmq-01 rabbitmq_server-3.11.5]# ./sbin/rabbitmqctl list_queues
Timeout: 60.0 seconds ...
Listing queues for vhost / ...
name	messages
task_queue	1

此时再启用worker.py,确保可以正常消费消息。

[root@rabbitmq-01 code]# python worker.py 
 [*] Waiting for messages. To exit press CTRL+C
 [x] Received 'Very Long message....................'
 [x] Done

Note on message persistence

消息和队列的持久化并不能100%确保消息和队列不会丢失。当消息抵达RabbitMQ的时候,可能还没来得及将其送内存写入磁盘,服务或者服务器就挂了。

又或者还没有到写入磁盘的时间。这个和fsync(2)有关系。

如果需要更强的持久化机制,请参考 publisher confirms 

 

 

Fair dispatch

目前我们的消费者的工作方式是轮询的,终端1和终端2的worker.py轮流去队列中获取消息并处理。假设某个终端的worker.py 已经空闲同时另一个终端的worker.py 处于忙碌状态,即便此时队列中有消息,只要这条消息轮到了忙碌的那个worker.py ,那么空闲的worker.py 也不会去抢这条消息,而是等待忙碌的worker.py 忙完了,让它去处理。

我们来演示一下。首先同样是打开两个终端,运行worker.py 。

随后在第三个终端运行new_task.py,发送4条消息,奇数消息执行时间短(1秒),偶数消息执行时间长(24秒)。

每次发布完消息我们都等待差不多2秒的时间,再发布下一条。

[root@rabbitmq-01 code]# python new_task.py Odd message .
 [x] Sent 'Odd message .'
[root@rabbitmq-01 code]# python new_task.py Even message ........................
 [x] Sent 'Even message ........................'
[root@rabbitmq-01 code]# python new_task.py Odd message .
 [x] Sent 'Odd message .'
[root@rabbitmq-01 code]# python new_task.py Even message ........................
 [x] Sent 'Even message ........................'

我们来看第一个终端。

[root@rabbitmq-01 code]# python worker.py 
 [*] Waiting for messages. To exit press CTRL+C
 [x] Received 'Odd message .'
 [x] Done
 [x] Received 'Odd message .'
 [x] Done

第二个终端。

[root@rabbitmq-01 code]# python worker.py 
 [*] Waiting for messages. To exit press CTRL+C
 [x] Received 'Even message ........................'
 [x] Done
 [x] Received 'Even message ........................'
 [x] Done

实际在发布第四条消息的时候,第一个终端已经处理完2条Odd message .了,已经处于空闲状态。而第二个终端还在处理第一条Even message ........................ 

即便如此,第四条消息依然会等待第二个终端空闲了再给它处理。

这样就很不智能,效率低下。

我们可以使用basic_qos方法加上参数prefetch_count=1来解决这个问题。它会告诉RabbitMQ,在同一时间只能提供1条消息给消费者。换句话说,不要将消息发送给消费者,除非它们已经处理并确认了手头的消息。

channel.basic_qos(prefetch_count=1)

也就是说,现在可以实现智能分配消息了,根据消费者的忙碌情况,而不再是死板的轮询了。

测试结果和我们预想的一样。

# 终端一
[root@rabbitmq-01 code]# python worker.py 
 [*] Waiting for messages. To exit press CTRL+C
 [x] Received 'Odd message .'
 [x] Done
 [x] Received 'Odd message .'
 [x] Done
 [x] Received 'Even message ........................'
 [x] Done
 
# 终端二
 [root@rabbitmq-01 code]# python worker.py 
 [*] Waiting for messages. To exit press CTRL+C
 [x] Received 'Even message ........................'
 [x] Done

Note about queue size

如果所有的消费者都处于忙碌的状态,那么队列中的消息就会一点点积累,可能会导致队列被填满、堵塞。

此时需要考虑增加额外的消费者或者使用 message TTL 。

 

 

Putting it all together

最后把代码整合一下,整合后的代码和之前的有点不同,比如woker.py没有定义main函数,没有定义异常捕获。不过不会影响实际的功能。

worker.py

#!/usr/bin/env python
import pika
import time

connection = pika.BlockingConnection(
    pika.ConnectionParameters(host='localhost'))
channel = connection.channel()

channel.queue_declare(queue='task_queue', durable=True)
print(' [*] Waiting for messages. To exit press CTRL+C')


def callback(ch, method, properties, body):
    print(" [x] Received %r" % body.decode())
    time.sleep(body.count(b'.'))
    print(" [x] Done")
    ch.basic_ack(delivery_tag=method.delivery_tag)


channel.basic_qos(prefetch_count=1)
channel.basic_consume(queue='task_queue', on_message_callback=callback)

channel.start_consuming()

new_task.py

#!/usr/bin/env python
import pika
import sys

connection = pika.BlockingConnection(
    pika.ConnectionParameters(host='localhost'))
channel = connection.channel()

channel.queue_declare(queue='task_queue', durable=True)

message = ' '.join(sys.argv[1:]) or "Hello World!"
channel.basic_publish(
    exchange='',
    routing_key='task_queue',
    body=message,
    properties=pika.BasicProperties(
        delivery_mode=pika.spec.PERSISTENT_DELIVERY_MODE,
    ))
print(" [x] Sent %r" % message)
connection.close()

 

 

总结

这篇博文中,模拟了多个消费者进程。使用time.sleep()来模拟不同耗时程度的任务。

通过多个消费者进程+多次生产消息,我们知道默认情况下,队列使用轮询的方式来工作。

消息确认机制的存在帮助我们确保当消费者进程发生意外的时候,消息不会因此丢失从而“无人处理”。

消息和队列的持久化机制确保了当服务或者服务器宕机或者重启的时候,消息和队列不会丢失。

轮询某种程度太过死板,我们通过basic_qos方法加上参数prefetch_count=1使得空闲的消费者进程可以收到消息,即便按照轮询的法则这条消息可能不是它的。

与[转帖]RabbitMQ学习笔记03:Work Queues相似的内容:

[转帖]RabbitMQ学习笔记03:Work Queues

参考资料:RabbitMQ tutorial - Work Queues — RabbitMQ 前言 这篇文章我们会创建一个Work Queue,它会在多个worker(即消费者 consumer)中分发耗时的任务。Work Queue也叫做Task Queue是为了避免当处理一个占用资源的任务时必

[转帖]RabbitMQ学习笔记04:Publish/Subscribe

参考资料:RabbitMQ tutorial - Publish/Subscribe — RabbitMQ 前言 在 RabbitMQ学习笔记03:Work Queues 中,每个进入队列中的消息只会被投递给一个消费者进程。而在这篇文章中,我们将会把一条消息同时投递给多个消费者进程。这种模式也叫做p

[转帖]RabbitMQ学习笔记01:初识与安装

https://www.cnblogs.com/alongdidi/p/rabbitmq_overview.html 原作者写的真好. 前言 本人是一名运维工程师,在此公司接触到 RabbitMQ ,平时针对此软件的工作内容就是集群的安装以及配置监控等,对其的理解也仅仅是知道其是一种消息队列的服务,

[转帖]RabbitMQ学习笔记02:Hello World!

参考资料:RabbitMQ tutorial - "Hello world!" — RabbitMQ 前言 RabbitMQ是一个中间人,它接受和转发消息。我们可以把它想象成一个邮局:当你把邮件投入邮箱的时候,你可以确信它最终会被投递到收件人的手中。RabbitMQ就是那个邮箱、邮局和邮差。区别就在

[转帖]RabbitMQ学习笔记05:Routing

参考资料:RabbitMQ tutorial - Routing — RabbitMQ 前言 在之前的文章中我们构建了一个简单的日志系统,它可以广播消息到多个消费者中。 在这篇文章中,我们打算实现仅订阅消息的子集(即不是所有的消息,仅仅只是一部分消息。注意,这里不是说一条消息的一部分)。例如我们只会

[转帖]RabbitMQ学习笔记06:Topics

https://www.cnblogs.com/alongdidi/p/rabbitmq_tutorial_five.html 前言 在上一篇博文中我们使用direct类型的exchange改善了我们的日志系统,但是它仍然有一定的限制,它没有办法基于多个条件路由消息。 我们可能不仅仅希望基于日志级别

[转帖]RabbitMQ学习笔记07:RPC

https://www.cnblogs.com/alongdidi/p/rabbitmq_tutorial_six.html 参考资料:RabbitMQ tutorial - Remote procedure call (RPC) — RabbitMQ Remote Procedure Call W

[转帖]RabbitMQ:Exchange的Fanout类型的介绍和使用

https://blog.csdn.net/weixin_45492007/article/details/106095591 1.声明 当前的内容用于本人学习和使用Fanout类型的Exchange,主要理解其主要作用 2.Fanout Exchange的官方介绍 扇出交换机将消息路由到与其绑定的

[转帖]RabbitMQ服务优化,修改最大连接数

https://www.cnblogs.com/hoyeong/p/16242202.html RabbitMQ的优化RabbitMQ的连接数是压垮消息队列的一个重要的指标。所以在平时使用OpenStack平台的过程中,如果大量的用户同时创建虚拟机,会导致云平台创建报错,其实就是消息队列服务的崩溃。

[转帖]RabbitMQ性能优化

https://www.cnblogs.com/zhengchunyuan/p/9253728.html 修改rabbitmq.config文件 rabbitmq.config文件时rabbitmq的配置文件,他遵守Erlang配置文件定义。 rabbitmq.config文件位置: Unix $R