参考资料:RabbitMQ tutorial - Routing — RabbitMQ
前言
在之前的文章中我们构建了一个简单的日志系统,它可以广播消息到多个消费者中。
在这篇文章中,我们打算实现仅订阅消息的子集(即不是所有的消息,仅仅只是一部分消息。注意,这里不是说一条消息的一部分)。例如我们只会把严重的错误消息写到日志文件中,同时我们会把所有的消息都输出到屏幕上。
Bindings
在之前的示例中我们学会了绑定exchange
和队列。
channel.queue_bind(exchange=exchange_name,
queue=queue_name)
绑定是exchange
和队列之间的关系,我们可以简单理解为某个队列对于来自于某个exchange
的消息感兴趣。
在绑定的过程当中可以使用额外的routing_key
参数。为了避免和basic_publish
参数混淆,我们将其称之为binding_key
。
其实binding_key
就是exchange
和队列在绑定的时候使用的routing_key
。
下面是我们在绑定的时候创建了一个binding_key
。
channel.queue_bind(exchange=exchange_name,
queue=queue_name,
routing_key='black')
binding_key
的含义取决于exchange
的类型,我们之前创建的fanout
类型的exchange
会忽略这个参数的值。
Direct exchange
想要完成前言中我们提到的需求的话,我们需要使用direct
类型的exchange
。direct
的路由算法也十分简单,发送给direct
类型的exchange
的消息一般会带有一个routing_key
参数,这条消息会被路由到带有相同的binding_key
值的队列中。这种匹配是必须准确无误的。
就像这样:
从图中我们可以看到,direct
类型的名为x
的exchange
绑定了2个队列Q1
和Q2
,在与Q1
绑定时的binding_key
是orange
,在与Q2
绑定时共绑定了2次,第一个绑定的binding_key
是black
,第二个绑定的binding_key
是green
。
在这种情况下,发给x
的消息,如果其routing_key
是orange
的话,则消息会被发送给队列Q1
;如果其routing_key
是black
或者green
的话,则消息会被发送给队列Q2
;如果其routing_key
是其他值的话,那么这条消息会被丢弃!
Multiple bindings
direct
类型的exchange
在绑定多个队列的时候可以使用相同的binding_key
,这是被允许的。这种情况下的路由模式就类似于fanout
即消息会被复制并广播到多个匹配的队列中去,只不过区别在于fanout
不会丢弃消息,而direct
的话,如果routing_key
和binding_key
没有匹配的话,消息就会被丢弃。
Emitting logs
我们将会采用这个新的模型来作为日志系统。日志的严重性我们用routing_key
来表示。
让我们先聚焦提供日志的程序。首先我们声明一个exchange
。
channel.exchange_declare(exchange='direct_logs',
exchange_type='direct')
向该exchange
发送消息。
channel.basic_publish(exchange='direct_logs',
routing_key=severity,
body=message)
我们假设severity的取值为
- info
- warning
- error
Subscribing
接收消息的消费者
程序和之前的差不多,区别在于我们会根据我们的兴趣和需要做绑定。
result = channel.queue_declare(queue='', exclusive=True)
queue_name = result.method.queue
for severity in severities:
channel.queue_bind(exchange='direct_logs',
queue=queue_name,
routing_key=severity)
Putting it all together
emit_log_direct.py
#!/usr/bin/env python
import pika
import sys
connection = pika.BlockingConnection(
pika.ConnectionParameters(host='localhost'))
channel = connection.channel()
channel.exchange_declare(exchange='direct_logs', exchange_type='direct')
severity = sys.argv[1] if len(sys.argv) > 2 else 'info'
message = ' '.join(sys.argv[2:]) or 'Hello World!'
channel.basic_publish(
exchange='direct_logs', routing_key=severity, body=message)
print(" [x] Sent %r:%r" % (severity, message))
connection.close()
receive_logs_direct.py
#!/usr/bin/env python
import pika, sys, os
def main():
connection = pika.BlockingConnection(
pika.ConnectionParameters(host='localhost'))
channel = connection.channel()
channel.exchange_declare(exchange='direct_logs', exchange_type='direct')
result = channel.queue_declare(queue='', exclusive=True)
queue_name = result.method.queue
severities = sys.argv[1:]
if not severities:
sys.stderr.write("Usage: %s [info] [warning] [error]\n" % sys.argv[0])
sys.exit(1)
for severity in severities:
channel.queue_bind(
exchange='direct_logs', queue=queue_name, routing_key=severity)
print(' [*] Waiting for logs. To exit press CTRL+C')
def callback(ch, method, properties, body):
print(" [x] %r:%r" % (method.routing_key, body.decode()))
channel.basic_consume(
queue=queue_name, on_message_callback=callback, auto_ack=True)
channel.start_consuming()
if __name__ == '__main__':
try:
main()
except KeyboardInterrupt:
print('Interrupted')
try:
sys.exit(0)
except SystemExit:
os._exit(0)
按照官方的测试的话,其实和上一篇博文一样,都会遇到文件写入未成功的问题。因此我们测试时,只能都直接输出到控制台了,这种测试方式不影响我们希望测试的功能。
第一个终端,启动一个仅接收error
的消息的消费者
进程。
python receive_logs_direct.py error
第二个终端,启动一个仅接收info
或者warning
的消息的消费者
进程。
python receive_logs_direct.py info warning error
第三个终端用于运行生产者
程序,每次使用不同的routing_key
。
[root@rabbitmq-01 code]# python emit_log_direct.py error "Run. Run. Or it will explode."
[x] Sent 'error':'Run. Run. Or it will explode.'
[root@rabbitmq-01 code]# python emit_log_direct.py info "Run. Run. Or it will explode."
[x] Sent 'info':'Run. Run. Or it will explode.'
[root@rabbitmq-01 code]# python emit_log_direct.py warning "Run. Run. Or it will explode."
[x] Sent 'warning':'Run. Run. Or it will explode.'
[root@rabbitmq-01 code]# python emit_log_direct.py Disater "Run. Run. Or it will explode."
[x] Sent 'Disater':'Run. Run. Or it will explode.'
4条消息发好之后,我们来看下两个终端的结果。
# 第一个终端
[root@rabbitmq-01 code]# python receive_logs_direct.py error
[*] Waiting for logs. To exit press CTRL+C
[x] 'error':'Run. Run. Or it will explode.'
^CInterrupted
# 第二个终端
[root@rabbitmq-01 code]# python receive_logs_direct.py info warning error
[*] Waiting for logs. To exit press CTRL+C
[x] 'error':'Run. Run. Or it will explode.'
[x] 'info':'Run. Run. Or it will explode.'
[x] 'warning':'Run. Run. Or it will explode.'
^CInterrupted
我们得到以下信息:
- error级别的信息在两个终端上都显示了。因为两个终端对应的队列的
binding_key
都包含error。 - info和warning级别的信息只会在第二个终端上显示。因为只有第二个终端对应的队列的
binding_key
包含info和warning。 - Disater级别的信息没有被显示。因为两个队列的
binding_key
都没有包含Disater,不匹配消息就会被丢弃,这是和fanout
的区别。
总结
这篇博文介绍了direct
类型的exchange
,使得消息可以根据routing_key
来路由到不同的队列中。