参考资料:RabbitMQ tutorial - Work Queues — RabbitMQ
前言
这篇文章我们会创建一个Work Queue
,它会在多个worker
(即消费者 consumer)中分发耗时的任务。Work Queue
也叫做Task Queue
是为了避免当处理一个占用资源的任务时必须等待它完成。相反,我们调度这个任务晚点再完成。我们将任务封装成消息送入队列中。worker
进程会在后台工作直到最终完成这个任务。当运行许多worker
的时候,这个任务会在它们之间共享。
这个概念在web应用程序中特别有用,这使得在一次短的HTTP请求窗口中处理复杂的任务成为了可能。
之前的任务我们发送给消息队列的消息是简单的Hello World!
,这次我们会发送一些字符串来表示复杂任务。我们并没有一个真实的任务,比如需要resized的图片或者需要渲染的PDF文件,因此我们只能用time.sleep()
函数来表示我们在处理一个复杂任务处于忙碌的状态。我们使用小数点来表示问题的复杂程度,小数点越多,问题越复杂,耗时越久。每个小数点表示耗时1秒钟。例如如果有一个任务是Hello...
,那么这个任务就耗时3秒。
我们需要稍微修改一下send.py
,允许从CLI发送任意的消息。这个程序会安排任务到我们的队列中,因此我们将其命名为new_task.py
import sys
message = ' '.join(sys.argv[1:]) or "Hello World!"
channel.basic_publish(exchange='',
routing_key='hello',
body=message)
print(" [x] Sent %r" % message)
这里我们贴一下当前的new_task.py
完整代码。
#!/usr/bin/env python
import pika
import sys
connection = pika.BlockingConnection(
pika.ConnectionParameters(host='localhost'))
channel = connection.channel()
channel.queue_declare(queue='hello')
message = ' '.join(sys.argv[1:]) or "Hello World!"
channel.basic_publish(exchange='',
routing_key='hello',
body=message)
print(" [x] Sent %r" % message)
connection.close()
receive.py
同样需要修改:它需要根据小数点的数量来模拟复杂程序的处理。它会从队列中弹出消息并处理任务。我们将其命名为worker.py
。
import time
def callback(ch, method, properties, body):
print(" [x] Received %r" % body.decode())
time.sleep(body.count(b'.'))
print(" [x] Done")
这里我们贴一下当前的worker.py
完整代码。
#!/usr/bin/env python
import pika, sys, os
import time
def main():
connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()
channel.queue_declare(queue='hello')
def callback(ch, method, properties, body):
print(" [x] Received %r" % body.decode())
time.sleep(body.count(b'.'))
print(" [x] Done")
channel.basic_consume(queue='hello', on_message_callback=callback, auto_ack=True)
print(' [*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming()
if __name__ == '__main__':
try:
main()
except KeyboardInterrupt:
print('Interrupted')
try:
sys.exit(0)
except SystemExit:
os._exit(0)
Round-robin dispatching
使用Task Queue
的一个优点就是实现简单的并行工作能力,如果我们在处理大量堆积工作的时候,我们可以通过简单地增加worker
进程来扩展。
这里我们开启3个终端,前2个终端都运行python worker.py
用来等待消息。
在第三个终端,我们输入如下。也就是说我们一共发送了5条消息。
[root@rabbitmq-01 code]# python new_task.py First message.
[x] Sent 'First message.'
[root@rabbitmq-01 code]# python new_task.py Second message..
[x] Sent 'Second message..'
[root@rabbitmq-01 code]# python new_task.py Third message...
[x] Sent 'Third message...'
[root@rabbitmq-01 code]# python new_task.py Fourth message....
[x] Sent 'Fourth message....'
[root@rabbitmq-01 code]# python new_task.py Fifth message.....
[x] Sent 'Fifth message.....'
随后我们回到前两个终端查看结果。
第一个终端。
[root@rabbitmq-01 code]# python worker.py
[*] Waiting for messages. To exit press CTRL+C
[x] Received 'First message.'
[x] Done
[x] Received 'Third message...'
[x] Done
[x] Received 'Fifth message.....'
[x] Done
^CInterrupted
第二个终端。
[root@rabbitmq-01 code]# python worker.py
[*] Waiting for messages. To exit press CTRL+C
[x] Received 'Second message..'
[x] Done
[x] Received 'Fourth message....'
[x] Done
^CInterrupted
这5条消息会依次以轮询round-robin
的方式被2个worker
(消费者)处理掉。
在woker.py
中我们设置了time.sleep()
,通过等待来模拟处理复杂任务,因此在前2个终端,小数点越多的消息我们等待其出现done的时长越久。
Message acknowledgment
某些任务可能需要执行几分钟或者更长的时间,如果我们在任务执行完毕之前就停止消费者的运行,会发生什么?
在我们目前的代码基础上,一旦RabbitMQ
把消息投递给消费者
,或者说一旦消费者
从队列中消费掉消息,那么这条消息就会被标记为已删除(deletion)
。如果消费者程序还在处理消息我们就终止了消费者程序,那么这条消息就会丢失。分配给消费者的消息但是没有被处理,那么这条消息也会丢失。
RabbitMQ 支持消息确认机制
(message acknowledgements),通过此机制消费者
可以告诉RabbitMQ
消息是否已经收到、处理,这样子RabbitMQ
就可以放心地删除这条消息了。
如果消费者
挂了(比如channel、connection关闭或者TCP连接丢失了)导致没有发送ack,RabbitMQ
就会知道那条消息没有被完全的处理就会把它重新放回队列中。如果此时有其他的消费者
连接着队列,那么这条消息就可以被其他的消费者
处理掉。
消费者必须返回ack确认,否则就会超时。默认的超时时长是30分钟。这样就避免了消费者程序“卡住了”导致没有返回ack。ack超时时长的修改请参考Delivery Acknowledgement Timeout
。
手动的消息确认默认是启用的。在之前的代码中,我们通过auto_ack=True
将它关闭了。现在我们要启用消息确认了。
修改callback
函数和channel.basic_consume
def callback(ch, method, properties, body):
print(" [x] Received %r" % body.decode())
time.sleep(body.count(b'.') )
print(" [x] Done")
ch.basic_ack(delivery_tag = method.delivery_tag)
channel.basic_consume(queue='hello', on_message_callback=callback)
参考上面的测试示例,我们打开2个终端,运行worker.py
。随后在第三个终端执行一个10秒的长任务python new_task.py Long message..........
此时第一个终端的worker.py
就会收到并处理消息,同时第二个终端依然处于等待消息的状态。随后我们在第一个终端终止worker.py
[root@rabbitmq-01 code]# python worker.py
[*] Waiting for messages. To exit press CTRL+C
[x] Received 'Long message..........'
^CInterrupted
此时第二个终端就会有消息进来了,证明我们的消息确认机制有正常工作,等待10秒,任务完毕。
[root@rabbitmq-01 code]# python worker.py
[*] Waiting for messages. To exit press CTRL+C
[x] Received 'Long message..........'
[x] Done
即使在消息抵达队列然后被消费者消费之后,我们关闭所有终端的worker.py
。消息依然会存在于队列中,等待新的消费者连接到队列上。
消息的确认必须和消息送达消费者的channel一致,尝试不同的channel去确认消息会导致channel级别的异常发生。
Forgotten acknowledgment
我们很容易忘记使用basic_ack
来确认消息,但是这个会导致严重的问题。如果我们忘记确认的话,当客户端程序(应该是指消费者)退出的时候,消息会被重新投递。但是RabbitMQ
会吃掉越来越多的内存,因为它无法释放未确认的消息。
可以使用以下命令查看未确认的消息数量。
[root@rabbitmq-01 rabbitmq_server-3.11.5]# ./sbin/rabbitmqctl list_queues name messages_ready messages_unacknowledged
Timeout: 60.0 seconds ...
Listing queues for vhost / ...
name messages_ready messages_unacknowledged
hello 0 0
Message durability
通过消息的确认机制,我们已经学会如何确保当消费者程序出现问题的时候,消失不会丢失。但是如果RabbitMQ
服务甚至服务器挂了的话,那么我们的消息还是会丢失的。
当RabbitMQ
挂了的时候,默认情况下,消息和队列都会被RabbitMQ
遗忘,除非我们让队列和消息都变成持久的(durable)
。
首先我们将队列声明为持久的。
channel.queue_declare(queue='hello', durable=True)
这句代码本身是没有问题的,但是由于我们的环境已经存在了一个名为hello
的队列并且其是非持久的,此时我们再声明一个持久的同名队列,就会报错。因为RabbitMQ
不支持同一个队列使用不同参数来声明。
我们只需要换个新名称声明队列即可。
channel.queue_declare(queue='task_queue', durable=True)
因为生产者和消费者都有声明队列的代码,因此两边都需要修改。
worker.py
里面还要消费者消费队列的代码,也要修改队列名称。
现在,我们就可以确保当重启 RabbitMQ
服务或者服务器时,队列task_queue
不会丢失了。
接下来我们需要确保消息是持久的。通过提供属性delivery_mode
和值pika.spec.PERSISTENT_DELIVERY_MODE
channel.basic_publish(exchange='',
routing_key="task_queue",
body=message,
properties=pika.BasicProperties(
delivery_mode = pika.spec.PERSISTENT_DELIVERY_MODE
))
测试方式也很简单,代码修改好之后, 先push一条消息。
[root@rabbitmq-01 code]# python new_task.py Very Long message....................
[x] Sent 'Very Long message....................'
查看队列。
[root@rabbitmq-01 rabbitmq_server-3.11.5]# ./sbin/rabbitmqctl list_queues
Timeout: 60.0 seconds ...
Listing queues for vhost / ...
name messages
task_queue 1
hello 0
但是我们不启用worker.py
来处理消息,我们直接停止服务或者重启服务器(记得确保开机启动)。
./sbin/rabbitmqctl stop
./sbin/rabbitmq-server -detached
再次查看,就会发现队列和消息都还在。
[root@rabbitmq-01 rabbitmq_server-3.11.5]# ./sbin/rabbitmqctl list_queues
Timeout: 60.0 seconds ...
Listing queues for vhost / ...
name messages
task_queue 1
此时再启用worker.py
,确保可以正常消费消息。
[root@rabbitmq-01 code]# python worker.py
[*] Waiting for messages. To exit press CTRL+C
[x] Received 'Very Long message....................'
[x] Done
Note on message persistence
消息和队列的持久化并不能100%确保消息和队列不会丢失。当消息抵达RabbitMQ
的时候,可能还没来得及将其送内存写入磁盘,服务或者服务器就挂了。
又或者还没有到写入磁盘的时间。这个和fsync(2)
有关系。
如果需要更强的持久化机制,请参考 publisher confirms
Fair dispatch
目前我们的消费者的工作方式是轮询的,终端1和终端2的worker.py
轮流去队列中获取消息并处理。假设某个终端的worker.py
已经空闲同时另一个终端的worker.py
处于忙碌状态,即便此时队列中有消息,只要这条消息轮到了忙碌的那个worker.py
,那么空闲的worker.py
也不会去抢这条消息,而是等待忙碌的worker.py
忙完了,让它去处理。
我们来演示一下。首先同样是打开两个终端,运行worker.py
。
随后在第三个终端运行new_task.py,发送4条消息,奇数消息执行时间短(1秒),偶数消息执行时间长(24秒)。
每次发布完消息我们都等待差不多2秒的时间,再发布下一条。
[root@rabbitmq-01 code]# python new_task.py Odd message .
[x] Sent 'Odd message .'
[root@rabbitmq-01 code]# python new_task.py Even message ........................
[x] Sent 'Even message ........................'
[root@rabbitmq-01 code]# python new_task.py Odd message .
[x] Sent 'Odd message .'
[root@rabbitmq-01 code]# python new_task.py Even message ........................
[x] Sent 'Even message ........................'
我们来看第一个终端。
[root@rabbitmq-01 code]# python worker.py
[*] Waiting for messages. To exit press CTRL+C
[x] Received 'Odd message .'
[x] Done
[x] Received 'Odd message .'
[x] Done
第二个终端。
[root@rabbitmq-01 code]# python worker.py
[*] Waiting for messages. To exit press CTRL+C
[x] Received 'Even message ........................'
[x] Done
[x] Received 'Even message ........................'
[x] Done
实际在发布第四条消息的时候,第一个终端已经处理完2条Odd message .
了,已经处于空闲状态。而第二个终端还在处理第一条Even message ........................
即便如此,第四条消息依然会等待第二个终端空闲了再给它处理。
这样就很不智能,效率低下。
我们可以使用basic_qos
方法加上参数prefetch_count=1
来解决这个问题。它会告诉RabbitMQ
,在同一时间只能提供1条消息给消费者。换句话说,不要将消息发送给消费者,除非它们已经处理并确认了手头的消息。
channel.basic_qos(prefetch_count=1)
也就是说,现在可以实现智能分配消息了,根据消费者的忙碌情况,而不再是死板的轮询了。
测试结果和我们预想的一样。
# 终端一
[root@rabbitmq-01 code]# python worker.py
[*] Waiting for messages. To exit press CTRL+C
[x] Received 'Odd message .'
[x] Done
[x] Received 'Odd message .'
[x] Done
[x] Received 'Even message ........................'
[x] Done
# 终端二
[root@rabbitmq-01 code]# python worker.py
[*] Waiting for messages. To exit press CTRL+C
[x] Received 'Even message ........................'
[x] Done
Note about queue size
如果所有的消费者都处于忙碌的状态,那么队列中的消息就会一点点积累,可能会导致队列被填满、堵塞。
此时需要考虑增加额外的消费者或者使用 message TTL 。
Putting it all together
最后把代码整合一下,整合后的代码和之前的有点不同,比如woker.py
没有定义main函数,没有定义异常捕获。不过不会影响实际的功能。
worker.py
#!/usr/bin/env python
import pika
import time
connection = pika.BlockingConnection(
pika.ConnectionParameters(host='localhost'))
channel = connection.channel()
channel.queue_declare(queue='task_queue', durable=True)
print(' [*] Waiting for messages. To exit press CTRL+C')
def callback(ch, method, properties, body):
print(" [x] Received %r" % body.decode())
time.sleep(body.count(b'.'))
print(" [x] Done")
ch.basic_ack(delivery_tag=method.delivery_tag)
channel.basic_qos(prefetch_count=1)
channel.basic_consume(queue='task_queue', on_message_callback=callback)
channel.start_consuming()
new_task.py
#!/usr/bin/env python
import pika
import sys
connection = pika.BlockingConnection(
pika.ConnectionParameters(host='localhost'))
channel = connection.channel()
channel.queue_declare(queue='task_queue', durable=True)
message = ' '.join(sys.argv[1:]) or "Hello World!"
channel.basic_publish(
exchange='',
routing_key='task_queue',
body=message,
properties=pika.BasicProperties(
delivery_mode=pika.spec.PERSISTENT_DELIVERY_MODE,
))
print(" [x] Sent %r" % message)
connection.close()
总结
这篇博文中,模拟了多个消费者
进程。使用time.sleep()
来模拟不同耗时程度的任务。
通过多个消费者
进程+多次生产消息,我们知道默认情况下,队列使用轮询的方式来工作。
消息确认机制的存在帮助我们确保当消费者
进程发生意外的时候,消息不会因此丢失从而“无人处理”。
消息和队列的持久化机制确保了当服务或者服务器宕机或者重启的时候,消息和队列不会丢失。
轮询某种程度太过死板,我们通过basic_qos
方法加上参数prefetch_count=1
使得空闲的消费者
进程可以收到消息,即便按照轮询的法则这条消息可能不是它的。