前言
在上一篇博文中我们使用direct
类型的exchange
改善了我们的日志系统,但是它仍然有一定的限制,它没有办法基于多个条件路由消息。
我们可能不仅仅希望基于日志级别(严重性)来订阅日志,也希望可以基于比如说日志的来源。比如syslog
这个unix工具就是这么工作的,它会基于severity
(info, warn, crit...) 和facility
(auth, cron, kern...) 来路由。
这样就比较灵活了,比如我们可能希望获取crit级别的cron类型日志同时希望获取所有级别的kern类型日志。
想要实现的话,我们需要一个更复杂的exchange
,叫做topic
。
Topic exchange
消息发送给topic
类型的exchange
的时候,不能使用随意的routing_key
,它必须得是一个使用小数点分隔的单词列表。单词可以是随意的不过一般都是会和消息有关系。以下是一些有效的routing_key
示例:"stock.usd.nyse"
, "nyse.vmw"
, "quick.orange.rabbit"
。routing_key
的大小限制是255字节。
binding_key
也必须是同样的格式。topic exchange
背后的逻辑其实和direct exchange
是类似的,也是将消息发送给routing_key
和binding_key
匹配的队列。不过在binding_key
中有两种特殊的情况:
*
(star) 替换成一个单词。#
(hash) 替换成零或者多个单词,即任意个单词。
在本次的案例中,所有发送的消息都是用来描述动物的。消息发送时的routing_key
会包含3个字段的信息
- 敏捷性 celerity
- 颜色
- 物种 species
<celerity>.<colour>.<species>
*.orange.*
绑定到队列Q1上,*.*.rabbit
和lazy.#
绑定到队列Q2上。
通过刚才对routing_key
的字段的解释以及绑定的关系,我们可以知道:
- 队列Q1对于橙色的动物比较感兴趣。
- 队列Q2对于兔子或者懒洋洋的动物比较感兴趣。
接下来我们举例说明一些带有具体的routing_key
会进入的队列:
quick.orange.rabbit
两个队列都会进入。lazy.orange.elephant
两个队列都会进入。quick.orange.fox
只会进入队列Q1。lazy.brown.fox
只会进入队列Q2。lazy.pink.rabbit
虽然它匹配了2个binding_key
,但是它只会进入队列Q2一次。quick.brown.fox
没有任何的binding_key
和它匹配,因此这条消息会被丢弃。
虽然上面我们说routing_key
只有3个字段,但是由于lazy.#
的存在,因此任意字段数都是可以的
orange
被丢弃。quick.orange.new.rabbit
被丢弃。lazy.orange.new.rabbit
只会进入队列Q2。
topic exchange
很强大,在某些条件下它可以等同于其他类型的exchange
。
- 当仅使用
#
来绑定队列时,它可以接收任意的routing_key
的消息,即忽略了routing_key
,此时就等同于fanout
。 - 如果
#
和*
都没有使用而仅使用字符串常量的时候,此时就等同于direct
。
Putting it all together
假设我们的routing_key
的形式是<facility>.<severity>
。
emit_log_topic.py
#!/usr/bin/env python
import pika
import sys
connection = pika.BlockingConnection(
pika.ConnectionParameters(host='localhost'))
channel = connection.channel()
channel.exchange_declare(exchange='topic_logs', exchange_type='topic')
routing_key = sys.argv[1] if len(sys.argv) > 2 else 'anonymous.info'
message = ' '.join(sys.argv[2:]) or 'Hello World!'
channel.basic_publish(
exchange='topic_logs', routing_key=routing_key, body=message)
print(" [x] Sent %r:%r" % (routing_key, message))
connection.close()
receive_logs_topic.py
#!/usr/bin/env python
import pika, sys, os
def main():
connection = pika.BlockingConnection(
pika.ConnectionParameters(host='localhost'))
channel = connection.channel()
channel.exchange_declare(exchange='topic_logs', exchange_type='topic')
result = channel.queue_declare(queue='', exclusive=True)
queue_name = result.method.queue
binding_keys = sys.argv[1:]
if not binding_keys:
sys.stderr.write("Usage: %s [binding_key]...\n" % sys.argv[0])
sys.exit(1)
for binding_key in binding_keys:
channel.queue_bind(
exchange='topic_logs', queue=queue_name, routing_key=binding_key)
print(' [*] Waiting for logs. To exit press CTRL+C')
def callback(ch, method, properties, body):
print(" [x] %r:%r" % (method.routing_key, body.decode()))
channel.basic_consume(
queue=queue_name, on_message_callback=callback, auto_ack=True)
channel.start_consuming()
if __name__ == '__main__':
try:
main()
except KeyboardInterrupt:
print('Interrupted')
try:
sys.exit(0)
except SystemExit:
os._exit(0)
测试的方式和以往类似,开启多个终端来启动消费者
进程,使用不同的binding_key
,监听消息。
python receive_logs_topic.py "#"
python receive_logs_topic.py "kern.*"
python receive_logs_topic.py "*.critical"
python receive_logs_topic.py "kern.*" "*.critical" # 多个binding_key绑定
然后我们发几条消息测试一下。
python emit_log_topic.py "kern.critical" "A critical kernel error"
python emit_log_topic.py "kern.info" "A critical kernel error"
python emit_log_topic.py "space.critical" "A critical kernel error"
python emit_log_topic.py "log.info" "A critical kernel error"
总结
这篇博文介绍了topic
类型的exchange
,在fanout
和direct
的基础上,通过模式匹配符号来实现更加灵活的消息和队列的匹配方式。