[转帖]RabbitMQ学习笔记06:Topics

rabbitmq,学习,笔记,topics · 浏览次数 : 0

小编点评

**topic exchange 消息路由** topic exchange 是一个支持模式匹配符号的交换。它允许您在路由消息之前使用模式匹配符号来进行匹配。 **模式匹配符号** 模式匹配符号是一个以 # 和 * 作为分隔符的单词列表。可以使用 # 和 * 来匹配任何数量的单词。例如,`#red*` 表示任何以 "red" 开头的单词,而 `*orange*` 表示任何包含 "orange" 的单词。 **示例** 假设您的 routing_key 是 "facility.critical"。以下是使用模式匹配符号匹配该字段的示例: * `queue_name = channel.queue_declare(...)` 中的 `queue` 参数指定一个名为 "topic_logs" 的队列。 * `channel.queue_bind(...)` 中的 `routing_key` 参数指定 "facility.critical"。 * `channel.basic_publish(...)` 中的 `body` 参数指定 "Hello World!"。 **使用 topic exchange 的优点** * 支持模式匹配符号。 * 允许您在路由消息之前进行匹配。 * 提高了灵活性和可维护性。 **注意** * routing_key 的长度限制为 255 字节。 * binding_key 的长度限制为 255 字节。 * 如果绑定 key 包含 # 或 *, 则仅匹配匹配第一个单词。

正文

https://www.cnblogs.com/alongdidi/p/rabbitmq_tutorial_five.html

 

前言

在上一篇博文中我们使用direct类型的exchange改善了我们的日志系统,但是它仍然有一定的限制,它没有办法基于多个条件路由消息。

我们可能不仅仅希望基于日志级别(严重性)来订阅日志,也希望可以基于比如说日志的来源。比如syslog这个unix工具就是这么工作的,它会基于severity (info, warn, crit...) 和facility (auth, cron, kern...) 来路由。

这样就比较灵活了,比如我们可能希望获取crit级别的cron类型日志同时希望获取所有级别的kern类型日志。

想要实现的话,我们需要一个更复杂的exchange,叫做topic

 

Topic exchange

消息发送给topic类型的exchange的时候,不能使用随意的routing_key,它必须得是一个使用小数点分隔的单词列表。单词可以是随意的不过一般都是会和消息有关系。以下是一些有效的routing_key示例:"stock.usd.nyse""nyse.vmw""quick.orange.rabbit"routing_key的大小限制是255字节。

binding_key也必须是同样的格式。topic exchange背后的逻辑其实和direct exchange是类似的,也是将消息发送给routing_keybinding_key匹配的队列。不过在binding_key中有两种特殊的情况:

  • * (star) 替换成一个单词。
  • # (hash) 替换成零或者多个单词,即任意个单词。

在本次的案例中,所有发送的消息都是用来描述动物的。消息发送时的routing_key会包含3个字段的信息

  • 敏捷性 celerity
  • 颜色
  • 物种 species

<celerity>.<colour>.<species>

*.orange.*绑定到队列Q1上,*.*.rabbitlazy.#绑定到队列Q2上。

通过刚才对routing_key的字段的解释以及绑定的关系,我们可以知道:

  • 队列Q1对于橙色的动物比较感兴趣。
  • 队列Q2对于兔子或者懒洋洋的动物比较感兴趣。

接下来我们举例说明一些带有具体的routing_key会进入的队列:

  • quick.orange.rabbit两个队列都会进入。
  • lazy.orange.elephant两个队列都会进入。
  • quick.orange.fox只会进入队列Q1。
  • lazy.brown.fox只会进入队列Q2。
  • lazy.pink.rabbit虽然它匹配了2个binding_key,但是它只会进入队列Q2一次。
  • quick.brown.fox没有任何的binding_key和它匹配,因此这条消息会被丢弃。

虽然上面我们说routing_key只有3个字段,但是由于lazy.#的存在,因此任意字段数都是可以的

  • orange被丢弃。
  • quick.orange.new.rabbit被丢弃。
  • lazy.orange.new.rabbit只会进入队列Q2。

topic exchange很强大,在某些条件下它可以等同于其他类型的exchange

  • 当仅使用#来绑定队列时,它可以接收任意的routing_key的消息,即忽略了routing_key,此时就等同于fanout
  • 如果#*都没有使用而仅使用字符串常量的时候,此时就等同于direct

 

Putting it all together

假设我们的routing_key的形式是<facility>.<severity>

emit_log_topic.py

#!/usr/bin/env python
import pika
import sys

connection = pika.BlockingConnection(
    pika.ConnectionParameters(host='localhost'))
channel = connection.channel()

channel.exchange_declare(exchange='topic_logs', exchange_type='topic')

routing_key = sys.argv[1] if len(sys.argv) > 2 else 'anonymous.info'
message = ' '.join(sys.argv[2:]) or 'Hello World!'
channel.basic_publish(
    exchange='topic_logs', routing_key=routing_key, body=message)
print(" [x] Sent %r:%r" % (routing_key, message))
connection.close()

receive_logs_topic.py

#!/usr/bin/env python
import pika, sys, os

def main():
    connection = pika.BlockingConnection(
    pika.ConnectionParameters(host='localhost'))
    channel = connection.channel()

    channel.exchange_declare(exchange='topic_logs', exchange_type='topic')

    result = channel.queue_declare(queue='', exclusive=True)
    queue_name = result.method.queue

    binding_keys = sys.argv[1:]
    if not binding_keys:
        sys.stderr.write("Usage: %s [binding_key]...\n" % sys.argv[0])
        sys.exit(1)

    for binding_key in binding_keys:
        channel.queue_bind(
            exchange='topic_logs', queue=queue_name, routing_key=binding_key)

    print(' [*] Waiting for logs. To exit press CTRL+C')


    def callback(ch, method, properties, body):
        print(" [x] %r:%r" % (method.routing_key, body.decode()))


    channel.basic_consume(
        queue=queue_name, on_message_callback=callback, auto_ack=True)

    channel.start_consuming()


if __name__ == '__main__':
    try:
        main()
    except KeyboardInterrupt:
        print('Interrupted')
        try:
            sys.exit(0)
        except SystemExit:
            os._exit(0)

测试的方式和以往类似,开启多个终端来启动消费者进程,使用不同的binding_key,监听消息。

python receive_logs_topic.py "#"
python receive_logs_topic.py "kern.*"
python receive_logs_topic.py "*.critical"
python receive_logs_topic.py "kern.*" "*.critical" # 多个binding_key绑定

然后我们发几条消息测试一下。

python emit_log_topic.py "kern.critical" "A critical kernel error"
python emit_log_topic.py "kern.info" "A critical kernel error"
python emit_log_topic.py "space.critical" "A critical kernel error"
python emit_log_topic.py "log.info" "A critical kernel error"

 

总结

这篇博文介绍了topic类型的exchange,在fanoutdirect的基础上,通过模式匹配符号来实现更加灵活的消息和队列的匹配方式。

与[转帖]RabbitMQ学习笔记06:Topics相似的内容:

[转帖]RabbitMQ学习笔记06:Topics

https://www.cnblogs.com/alongdidi/p/rabbitmq_tutorial_five.html 前言 在上一篇博文中我们使用direct类型的exchange改善了我们的日志系统,但是它仍然有一定的限制,它没有办法基于多个条件路由消息。 我们可能不仅仅希望基于日志级别

[转帖]RabbitMQ学习笔记01:初识与安装

https://www.cnblogs.com/alongdidi/p/rabbitmq_overview.html 原作者写的真好. 前言 本人是一名运维工程师,在此公司接触到 RabbitMQ ,平时针对此软件的工作内容就是集群的安装以及配置监控等,对其的理解也仅仅是知道其是一种消息队列的服务,

[转帖]RabbitMQ学习笔记02:Hello World!

参考资料:RabbitMQ tutorial - "Hello world!" — RabbitMQ 前言 RabbitMQ是一个中间人,它接受和转发消息。我们可以把它想象成一个邮局:当你把邮件投入邮箱的时候,你可以确信它最终会被投递到收件人的手中。RabbitMQ就是那个邮箱、邮局和邮差。区别就在

[转帖]RabbitMQ学习笔记03:Work Queues

参考资料:RabbitMQ tutorial - Work Queues — RabbitMQ 前言 这篇文章我们会创建一个Work Queue,它会在多个worker(即消费者 consumer)中分发耗时的任务。Work Queue也叫做Task Queue是为了避免当处理一个占用资源的任务时必

[转帖]RabbitMQ学习笔记04:Publish/Subscribe

参考资料:RabbitMQ tutorial - Publish/Subscribe — RabbitMQ 前言 在 RabbitMQ学习笔记03:Work Queues 中,每个进入队列中的消息只会被投递给一个消费者进程。而在这篇文章中,我们将会把一条消息同时投递给多个消费者进程。这种模式也叫做p

[转帖]RabbitMQ学习笔记05:Routing

参考资料:RabbitMQ tutorial - Routing — RabbitMQ 前言 在之前的文章中我们构建了一个简单的日志系统,它可以广播消息到多个消费者中。 在这篇文章中,我们打算实现仅订阅消息的子集(即不是所有的消息,仅仅只是一部分消息。注意,这里不是说一条消息的一部分)。例如我们只会

[转帖]RabbitMQ学习笔记07:RPC

https://www.cnblogs.com/alongdidi/p/rabbitmq_tutorial_six.html 参考资料:RabbitMQ tutorial - Remote procedure call (RPC) — RabbitMQ Remote Procedure Call W

[转帖]RabbitMQ:Exchange的Fanout类型的介绍和使用

https://blog.csdn.net/weixin_45492007/article/details/106095591 1.声明 当前的内容用于本人学习和使用Fanout类型的Exchange,主要理解其主要作用 2.Fanout Exchange的官方介绍 扇出交换机将消息路由到与其绑定的

[转帖]RabbitMQ服务优化,修改最大连接数

https://www.cnblogs.com/hoyeong/p/16242202.html RabbitMQ的优化RabbitMQ的连接数是压垮消息队列的一个重要的指标。所以在平时使用OpenStack平台的过程中,如果大量的用户同时创建虚拟机,会导致云平台创建报错,其实就是消息队列服务的崩溃。

[转帖]RabbitMQ性能优化

https://www.cnblogs.com/zhengchunyuan/p/9253728.html 修改rabbitmq.config文件 rabbitmq.config文件时rabbitmq的配置文件,他遵守Erlang配置文件定义。 rabbitmq.config文件位置: Unix $R