[转帖]RabbitMQ学习笔记05:Routing

rabbitmq,学习,笔记,routing · 浏览次数 : 0

小编点评

## RabbitMQ 中的 Direct 类型 Exchange 本博文介绍了如何使用 RabbitMQ 创建一个仅接收特定消息的子集,并通过 routing_key 进行路由。 **基本概念:** * **Exchange:** 是一组绑定的队列。多个队列可以绑定到同一个 exchange。 * **Queue:** 是一个用于存储消息的队列。 * **Binding:** 绑定一个队列到一个 exchange。 * **Routing key:** 用于指定消息应该被路由到哪个队列的标识符。 **Direct 类型 Exchange 的使用:** Direct 类型 exchange 的路由算法非常简单,它使用 routing_key 来匹配消息的来源。如果 routing_key 与绑定时设置的 key 相匹配,消息就会被发送到与 exchange 绑定的所有队列中。 **例子:** ```python channel.exchange_declare(exchange='direct_logs', exchange_type='direct') severity = sys.argv[1] if len(sys.argv) > 2 else 'info' message = ' '.join(sys.argv[2:]) or 'Hello World!' channel.basic_publish(exchange='direct_logs', routing_key=severity, body=message) ``` 这段代码将发送一个名为 `error` 的消息到名为 `direct_logs` 的 exchange 中。如果 `severity` 是 `error`,消息会发送到所有与 `direct_logs` 绑定的队列中。 **优点:** * 使用 routing_key 可以根据消息的来源进行路由。 * 提高了性能,因为消息仅在与 exchange 绑定的所有队列中发送。 **注意:** * routing_key 的选择非常重要,因为它决定了消息最终被发送到的队列。 * 如果没有设置 proper_fanout_dead_letter_q_size 参数,所有队列都将接收所有消息。 * 如果 routing_key 存在多个匹配的值,消息会根据第一个匹配的值发送。

正文

 

 

参考资料:RabbitMQ tutorial - Routing — RabbitMQ 

 

前言

在之前的文章中我们构建了一个简单的日志系统,它可以广播消息到多个消费者中。

在这篇文章中,我们打算实现仅订阅消息的子集(即不是所有的消息,仅仅只是一部分消息。注意,这里不是说一条消息的一部分)。例如我们只会把严重的错误消息写到日志文件中,同时我们会把所有的消息都输出到屏幕上。

 

Bindings

在之前的示例中我们学会了绑定exchange和队列。

channel.queue_bind(exchange=exchange_name,
                   queue=queue_name)

绑定是exchange和队列之间的关系,我们可以简单理解为某个队列对于来自于某个exchange的消息感兴趣。

在绑定的过程当中可以使用额外的routing_key参数。为了避免和basic_publish参数混淆,我们将其称之为binding_key

其实binding_key就是exchange和队列在绑定的时候使用的routing_key

下面是我们在绑定的时候创建了一个binding_key

channel.queue_bind(exchange=exchange_name,
                   queue=queue_name,
                   routing_key='black')

binding_key的含义取决于exchange的类型,我们之前创建的fanout类型的exchange会忽略这个参数的值。

 

Direct exchange

想要完成前言中我们提到的需求的话,我们需要使用direct类型的exchangedirect的路由算法也十分简单,发送给direct类型的exchange的消息一般会带有一个routing_key参数,这条消息会被路由到带有相同的binding_key值的队列中。这种匹配是必须准确无误的。

就像这样:

从图中我们可以看到,direct类型的名为xexchange绑定了2个队列Q1Q2,在与Q1绑定时的binding_keyorange,在与Q2绑定时共绑定了2次,第一个绑定的binding_keyblack,第二个绑定的binding_keygreen

在这种情况下,发给x的消息,如果其routing_keyorange的话,则消息会被发送给队列Q1;如果其routing_keyblack或者green的话,则消息会被发送给队列Q2;如果其routing_key是其他值的话,那么这条消息会被丢弃

 

Multiple bindings

direct类型的exchange在绑定多个队列的时候可以使用相同的binding_key,这是被允许的。这种情况下的路由模式就类似于fanout即消息会被复制并广播到多个匹配的队列中去,只不过区别在于fanout不会丢弃消息,而direct的话,如果routing_keybinding_key没有匹配的话,消息就会被丢弃。

 

Emitting logs

我们将会采用这个新的模型来作为日志系统。日志的严重性我们用routing_key来表示。

让我们先聚焦提供日志的程序。首先我们声明一个exchange

channel.exchange_declare(exchange='direct_logs',
                         exchange_type='direct')

向该exchange发送消息。

channel.basic_publish(exchange='direct_logs',
                      routing_key=severity,
                      body=message)

我们假设severity的取值为

  • info
  • warning
  • error

 

Subscribing

接收消息的消费者程序和之前的差不多,区别在于我们会根据我们的兴趣和需要做绑定。

result = channel.queue_declare(queue='', exclusive=True)
queue_name = result.method.queue

for severity in severities:
    channel.queue_bind(exchange='direct_logs',
                       queue=queue_name,
                       routing_key=severity)

 

Putting it all together

#!/usr/bin/env python
import pika
import sys

connection = pika.BlockingConnection(
    pika.ConnectionParameters(host='localhost'))
channel = connection.channel()

channel.exchange_declare(exchange='direct_logs', exchange_type='direct')

severity = sys.argv[1] if len(sys.argv) > 2 else 'info'
message = ' '.join(sys.argv[2:]) or 'Hello World!'
channel.basic_publish(
    exchange='direct_logs', routing_key=severity, body=message)
print(" [x] Sent %r:%r" % (severity, message))
connection.close()

receive_logs_direct.py

#!/usr/bin/env python
import pika, sys, os

def main():
    connection = pika.BlockingConnection(
    pika.ConnectionParameters(host='localhost'))
    channel = connection.channel()

    channel.exchange_declare(exchange='direct_logs', exchange_type='direct')

    result = channel.queue_declare(queue='', exclusive=True)
    queue_name = result.method.queue

    severities = sys.argv[1:]
    if not severities:
        sys.stderr.write("Usage: %s [info] [warning] [error]\n" % sys.argv[0])
        sys.exit(1)

    for severity in severities:
        channel.queue_bind(
            exchange='direct_logs', queue=queue_name, routing_key=severity)

    print(' [*] Waiting for logs. To exit press CTRL+C')


    def callback(ch, method, properties, body):
        print(" [x] %r:%r" % (method.routing_key, body.decode()))


    channel.basic_consume(
        queue=queue_name, on_message_callback=callback, auto_ack=True)

    channel.start_consuming()


if __name__ == '__main__':
    try:
        main()
    except KeyboardInterrupt:
        print('Interrupted')
        try:
            sys.exit(0)
        except SystemExit:
            os._exit(0)

按照官方的测试的话,其实和上一篇博文一样,都会遇到文件写入未成功的问题。因此我们测试时,只能都直接输出到控制台了,这种测试方式不影响我们希望测试的功能。

第一个终端,启动一个仅接收error的消息的消费者进程。

python receive_logs_direct.py error

第二个终端,启动一个仅接收info或者warning的消息的消费者进程。

python receive_logs_direct.py info warning error

第三个终端用于运行生产者程序,每次使用不同的routing_key

[root@rabbitmq-01 code]# python emit_log_direct.py error "Run. Run. Or it will explode."
 [x] Sent 'error':'Run. Run. Or it will explode.'
[root@rabbitmq-01 code]# python emit_log_direct.py info "Run. Run. Or it will explode."
 [x] Sent 'info':'Run. Run. Or it will explode.'
[root@rabbitmq-01 code]# python emit_log_direct.py warning "Run. Run. Or it will explode."
 [x] Sent 'warning':'Run. Run. Or it will explode.'
[root@rabbitmq-01 code]# python emit_log_direct.py Disater "Run. Run. Or it will explode."
 [x] Sent 'Disater':'Run. Run. Or it will explode.'

4条消息发好之后,我们来看下两个终端的结果。

# 第一个终端
[root@rabbitmq-01 code]# python receive_logs_direct.py error
 [*] Waiting for logs. To exit press CTRL+C
 [x] 'error':'Run. Run. Or it will explode.'
^CInterrupted

# 第二个终端
[root@rabbitmq-01 code]# python receive_logs_direct.py info warning error
 [*] Waiting for logs. To exit press CTRL+C
 [x] 'error':'Run. Run. Or it will explode.'
 [x] 'info':'Run. Run. Or it will explode.'
 [x] 'warning':'Run. Run. Or it will explode.'
^CInterrupted

我们得到以下信息:

  • error级别的信息在两个终端上都显示了。因为两个终端对应的队列的binding_key都包含error。
  • info和warning级别的信息只会在第二个终端上显示。因为只有第二个终端对应的队列的binding_key包含info和warning。
  • Disater级别的信息没有被显示。因为两个队列的binding_key都没有包含Disater,不匹配消息就会被丢弃,这是和fanout的区别。

 

总结

这篇博文介绍了direct类型的exchange,使得消息可以根据routing_key来路由到不同的队列中。

与[转帖]RabbitMQ学习笔记05:Routing相似的内容:

[转帖]RabbitMQ学习笔记05:Routing

参考资料:RabbitMQ tutorial - Routing — RabbitMQ 前言 在之前的文章中我们构建了一个简单的日志系统,它可以广播消息到多个消费者中。 在这篇文章中,我们打算实现仅订阅消息的子集(即不是所有的消息,仅仅只是一部分消息。注意,这里不是说一条消息的一部分)。例如我们只会

[转帖]RabbitMQ学习笔记01:初识与安装

https://www.cnblogs.com/alongdidi/p/rabbitmq_overview.html 原作者写的真好. 前言 本人是一名运维工程师,在此公司接触到 RabbitMQ ,平时针对此软件的工作内容就是集群的安装以及配置监控等,对其的理解也仅仅是知道其是一种消息队列的服务,

[转帖]RabbitMQ学习笔记02:Hello World!

参考资料:RabbitMQ tutorial - "Hello world!" — RabbitMQ 前言 RabbitMQ是一个中间人,它接受和转发消息。我们可以把它想象成一个邮局:当你把邮件投入邮箱的时候,你可以确信它最终会被投递到收件人的手中。RabbitMQ就是那个邮箱、邮局和邮差。区别就在

[转帖]RabbitMQ学习笔记03:Work Queues

参考资料:RabbitMQ tutorial - Work Queues — RabbitMQ 前言 这篇文章我们会创建一个Work Queue,它会在多个worker(即消费者 consumer)中分发耗时的任务。Work Queue也叫做Task Queue是为了避免当处理一个占用资源的任务时必

[转帖]RabbitMQ学习笔记04:Publish/Subscribe

参考资料:RabbitMQ tutorial - Publish/Subscribe — RabbitMQ 前言 在 RabbitMQ学习笔记03:Work Queues 中,每个进入队列中的消息只会被投递给一个消费者进程。而在这篇文章中,我们将会把一条消息同时投递给多个消费者进程。这种模式也叫做p

[转帖]RabbitMQ学习笔记06:Topics

https://www.cnblogs.com/alongdidi/p/rabbitmq_tutorial_five.html 前言 在上一篇博文中我们使用direct类型的exchange改善了我们的日志系统,但是它仍然有一定的限制,它没有办法基于多个条件路由消息。 我们可能不仅仅希望基于日志级别

[转帖]RabbitMQ学习笔记07:RPC

https://www.cnblogs.com/alongdidi/p/rabbitmq_tutorial_six.html 参考资料:RabbitMQ tutorial - Remote procedure call (RPC) — RabbitMQ Remote Procedure Call W

[转帖]RabbitMQ:Exchange的Fanout类型的介绍和使用

https://blog.csdn.net/weixin_45492007/article/details/106095591 1.声明 当前的内容用于本人学习和使用Fanout类型的Exchange,主要理解其主要作用 2.Fanout Exchange的官方介绍 扇出交换机将消息路由到与其绑定的

[转帖]RabbitMQ服务优化,修改最大连接数

https://www.cnblogs.com/hoyeong/p/16242202.html RabbitMQ的优化RabbitMQ的连接数是压垮消息队列的一个重要的指标。所以在平时使用OpenStack平台的过程中,如果大量的用户同时创建虚拟机,会导致云平台创建报错,其实就是消息队列服务的崩溃。

[转帖]RabbitMQ性能优化

https://www.cnblogs.com/zhengchunyuan/p/9253728.html 修改rabbitmq.config文件 rabbitmq.config文件时rabbitmq的配置文件,他遵守Erlang配置文件定义。 rabbitmq.config文件位置: Unix $R