[转帖]RabbitMQ学习笔记04:Publish/Subscribe

rabbitmq,学习,笔记,publish,subscribe · 浏览次数 : 0

小编点评

**文件内容:** ```python # receive_logs.py import pika, sys, osdef main(): connection = pika.BlockingConnection( pika.ConnectionParameters(host='localhost')) channel = connection.channel() channel.exchange_declare(exchange='logs', exchange_type='fanout') result = channel.queue_declare(queue='', exclusive=True) queue_name = result.method.queue channel.queue_bind(exchange='logs', queue=queue_name) def callback(ch, method, properties, body): print(\" [x] %r\" % body.decode()) print(' [*] Waiting for logs. To exit press CTRL+C') channel.basic_consume( queue=queue_name, on_message_callback=callback, auto_ack=True) channel.start_consuming() # emit_log.py import pika, sys, osdef main(): try: main() except KeyboardInterrupt: print('Interrupted') try: sys.exit(0) except SystemExit: os._exit(0) ``` **代码解析:** * **`receive_logs.py`**文件包含两个核心函数:`callback`和`main`。 * ``callback`**函数用于接收来自 exchange 的消息并打印消息信息。 * ``main`**函数用于创建 exchange、队列和绑定关系。 * ``bind`**函数用于将队列绑定到 exchange。 * ``queue_declare`**函数用于创建随机名称的临时队列。 * ``basic_consume`**函数用于从 exchange 中消费消息并打印消息信息。 **主要概念:** * **exchange**:一个用于存储多个队列的队列。 * **队列**:一个用于存储消息的队列。 * **绑定**:将队列绑定到 exchange。 ** fanout 类型 exchange 实现:** * 通过一个 **`exchange`**,可以创建多个 **`队列`**。 * **`fanout`**类型 exchange 可以将消息同时发布到多个 **`队列`**上。 **随机名称队列:** * 通过 **`queue_declare`**函数可以创建随机名称的队列。 * **`exclusive=True`**标记确保队列只能被消费一次。 **结论:** * 这文件展示了如何使用 **`exchange`**和 **`queue`**创建一个随机名称的临时队列并实现 fanout 类型 exchange。

正文

 

 

参考资料:RabbitMQ tutorial - Publish/Subscribe — RabbitMQ 

 

 

前言

在 RabbitMQ学习笔记03:Work Queues 中,每个进入队列中的消息只会被投递给一个消费者进程。而在这篇文章中,我们将会把一条消息同时投递给多个消费者进程。这种模式也叫做publish/subscribe

为了说明这个模式,我们将会创建一个日志系统。它由两个程序组成,第一个会发出日志消息,第二个会接收并打印它们。

在我们的系统中,每一个运行的消费者进程都会收到消息。用这个方式,一个消费者进程将日志写入磁盘上的文件中,同时另一个消费者进程将日志输出到屏幕上。

本质上,发布的日志将会广播到所有的消费者进程。

 

 

Exchanges

在之前的示例中,我们都是直接将消息发送到一个队列或者直接从一个队列中接收消息。现在我们正式介绍RabbitMQ中的完整的消息模型。

我们简单过一下我们之前遇到的一些概念:

  • 生产者(producer)是一个用户程序用来发送消息。
  • 队列(queue)是一个缓冲用来存储消息。
  • 消费者(consumer)是一个用户程序用来接收并处理消息。

消息模型的核心概念是生产者从来不会直接将消息发送给队列。实际上,生产者甚至不会知道消息是否会被发送到队列中去。

取而代之的是,生产者只能将消息发送给exchangeexchange是一个非常简单的东西,一方面它从生产者那边接收消息,另一方面它将消息推送给队列。exchange必须明确知道它要如何处理收到的消息。它是否应该被追加到特定的队列上?它是否应该被追加到多个队列上?它是否应该被丢弃。这些规则都是由exchange 的类型所决定的。

有几种交换器类型可以使用:directtopicheaders and fanout. 我们目前只关注最后一个fanout。我们创建一个类型为fanout 名字叫logsexchange

channel.exchange_declare(exchange='logs',
                         exchange_type='fanout')

fanout 的逻辑非常简单,它会把消息发送到它所关联的所有队列上。这样就可以满足我们上面说的这套日志系统了。


Listing exchanges

列出exchange的命令

[root@rabbitmq-01 rabbitmq_server-3.11.5]# ./sbin/rabbitmqctl list_exchanges
Listing exchanges for vhost / ...
name	type
amq.rabbitmq.trace	topic
amq.match	headers
amq.fanout	fanout
amq.headers	headers
amq.direct	direct
	direct
amq.topic	topic

我们可以看到有一些amq.*以及一个默认的没有名字的exchange,暂时不用理会。 

The default exchange

在之前的示例中,虽然我们没有提到关于exchange的信息,但是我们依然可以使用RabbitMQ。这是因为我们使用了默认的没有名字的exchange,我们使用空字符串""来表示它。

channel.basic_publish(exchange='',
                      routing_key='hello',
                      body=message)

无名的 exchange使用routing_key参数来判断如何路由消息到具体的队列中。


现在我们可以把消息推送到我们刚创建的exchange中了。

channel.basic_publish(exchange='logs',
                      routing_key='',
                      body=message)

 

 

Temporary queues

此前我们使用的队列是有具体的名字的(hello, task_queue)。可以命名队列对我们来说是很关键的,我们需要将worker指向相同的队列。当你需要在生产者消费者之间共享一个队列的话,那么给予队列名字就非常重要了。

但是这种情况不适用于我们的日志系统程序。我们想要收到所有的日志消息而不仅仅是其中的一部分。我们也只对当前正在流动的消息感兴趣而不是旧的。要解决这些问题,我们需要做两件事情。

首先,无论我们什么时候连到RabbitMQ上,我们都需要一个新的空的队列。要实现这个,我们需要使用随机的队列名称来创建队列,最好是让RabbitMQ自己来选择一个随机的队列名称。可以通过提供空的队列名称来实现。

result = channel.queue_declare(queue='')

此时,result.method.queue包含了一个随机的队列名称,形如amq.gen-JzTY20BRgKO-HjmUJj0wLg

其次,一旦消费者连接关闭了,那么队列应该被删除,我们通过exclusive flag 来实现:

result = channel.queue_declare(queue='', exclusive=True)

想要了解队列的其他 flags 和其他的属性,可以参考 Queues  — RabbitMQ 

 

 

Bindings

我们已经创建了一个类型为fanoutexchange和一个随机名称,exclusive队列。接下来我们需要将exchange和队列进行绑定 binding,这样子发送给exchange的消息才可以进入队列。

channel.queue_bind(exchange='logs',
                   queue=result.method.queue)

从现在开始,发送给名为logsexchange的消息将会被路由到名为result.method.queue的队列了。

Listing bindings

rabbitmqctl list_bindings

 

 

Putting it all together

生产者程序用于发出日志消息,代码和之前的其实差不多。主要的区别在于这次我们是将消息发送到有名字的名为logsexchange中,而不再是无名的exchange了。我们同样需要提供routeing_key,不过由于不是使用默认的无名exchange,因此它不再表示队列的名称;因为exchange的类型是fanout,因此routeing_key可以随便填写,它会被忽略。

emit_log.py

#!/usr/bin/env python
import pika
import sys

connection = pika.BlockingConnection(
    pika.ConnectionParameters(host='localhost'))
channel = connection.channel()

channel.exchange_declare(exchange='logs', exchange_type='fanout')

message = ' '.join(sys.argv[1:]) or "info: Hello World!"
channel.basic_publish(exchange='logs', routing_key='', body=message)
print(" [x] Sent %r" % message)
connection.close()

建立完连接以后我们就必须声明一个exchange,因为将消息发布给不存在的exchange是被禁止的。

如果没有队列绑定到exchange上的话,消息就会丢失。不过对于我们来说这是允许的,因为如果没有消费者在监听,我们就可以安全地丢弃这些消息。

receive_logs.py

#!/usr/bin/env python
import pika, sys, os

def main():
    connection = pika.BlockingConnection(
        pika.ConnectionParameters(host='localhost'))
    channel = connection.channel()

    channel.exchange_declare(exchange='logs', exchange_type='fanout')

    result = channel.queue_declare(queue='', exclusive=True)
    queue_name = result.method.queue

    channel.queue_bind(exchange='logs', queue=queue_name)

    def callback(ch, method, properties, body):
        print(" [x] %r" % body.decode())

    print(' [*] Waiting for logs. To exit press CTRL+C')
    channel.basic_consume(
        queue=queue_name, on_message_callback=callback, auto_ack=True)

    channel.start_consuming()


if __name__ == '__main__':
    try:
        main()
    except KeyboardInterrupt:
        print('Interrupted')
        try:
            sys.exit(0)
        except SystemExit:
            os._exit(0)

测试的方法是打开3个终端。

在官方中,是第一个终端运行

python receive_logs.py > logs_from_rabbit.log

第二个终端运行

python receive_logs.py

第三个终端运行

python emit_log.py

文件写入错误

按理来说是两个终端都会收到消息,然后第一个终端会把消息写入日志文件。我的实验结果是2个终端都可以收到消息,但似乎第一个终端是无法写入文件的。

因此前2个终端我们都是运行

python receive_logs.py

第三个终端保持不变。

前两个终端的结果都是

[root@rabbitmq-01 code]# python receive_logs.py
 [*] Waiting for logs. To exit press CTRL+C
 [x] 'info: Hello World!'

第三个终端是直接运行,使用默认的消息。

[root@rabbitmq-01 code]# python emit_log.py 
 [x] Sent 'info: Hello World!'

这就证明了我们这条消息,确实同时发送给了2个队列,实现了我们最初的功能。

至于为什么重定向到文件就不行了,不懂。不知道是否和代码有关系,我也处理不了。但只要我们的测试,能够证明消息有同时发布到2个队列上即可。

我们来看下是否有我们创建的exchange

[root@rabbitmq-01 rabbitmq_server-3.11.5]# ./sbin/rabbitmqctl list_exchanges
Listing exchanges for vhost / ...
name	type
... ...
logs	fanout

再来看一下我们自建的exchange和队列的绑定关系。

[root@rabbitmq-01 rabbitmq_server-3.11.5]# ./sbin/rabbitmqctl list_bindings
Listing bindings for vhost /...
source_name	source_kind	destination_name	destination_kind	routing_key	arguments
logs	exchange	amq.gen-jYPqTCqejxtn7e5GL4pNvw	queue	amq.gen-jYPqTCqejxtn7e5GL4pNvw	[]
logs	exchange	amq.gen-xWCPATcJOUE_ZsskZiJPAg	queue	amq.gen-xWCPATcJOUE_ZsskZiJPAg	[]

 

 

总结

这里我们第一次介绍了另一个核心概念exchangeexchange和队列的绑定。

通过fanout类型的exchange实现将一条消息同时发布到多个队列中。

通过空队列名称+exclusive=True标记实现了随机名称的临时队列。

学会了列出exchangebindings 

与[转帖]RabbitMQ学习笔记04:Publish/Subscribe相似的内容:

[转帖]RabbitMQ学习笔记04:Publish/Subscribe

参考资料:RabbitMQ tutorial - Publish/Subscribe — RabbitMQ 前言 在 RabbitMQ学习笔记03:Work Queues 中,每个进入队列中的消息只会被投递给一个消费者进程。而在这篇文章中,我们将会把一条消息同时投递给多个消费者进程。这种模式也叫做p

[转帖]RabbitMQ学习笔记01:初识与安装

https://www.cnblogs.com/alongdidi/p/rabbitmq_overview.html 原作者写的真好. 前言 本人是一名运维工程师,在此公司接触到 RabbitMQ ,平时针对此软件的工作内容就是集群的安装以及配置监控等,对其的理解也仅仅是知道其是一种消息队列的服务,

[转帖]RabbitMQ学习笔记02:Hello World!

参考资料:RabbitMQ tutorial - "Hello world!" — RabbitMQ 前言 RabbitMQ是一个中间人,它接受和转发消息。我们可以把它想象成一个邮局:当你把邮件投入邮箱的时候,你可以确信它最终会被投递到收件人的手中。RabbitMQ就是那个邮箱、邮局和邮差。区别就在

[转帖]RabbitMQ学习笔记03:Work Queues

参考资料:RabbitMQ tutorial - Work Queues — RabbitMQ 前言 这篇文章我们会创建一个Work Queue,它会在多个worker(即消费者 consumer)中分发耗时的任务。Work Queue也叫做Task Queue是为了避免当处理一个占用资源的任务时必

[转帖]RabbitMQ学习笔记05:Routing

参考资料:RabbitMQ tutorial - Routing — RabbitMQ 前言 在之前的文章中我们构建了一个简单的日志系统,它可以广播消息到多个消费者中。 在这篇文章中,我们打算实现仅订阅消息的子集(即不是所有的消息,仅仅只是一部分消息。注意,这里不是说一条消息的一部分)。例如我们只会

[转帖]RabbitMQ学习笔记06:Topics

https://www.cnblogs.com/alongdidi/p/rabbitmq_tutorial_five.html 前言 在上一篇博文中我们使用direct类型的exchange改善了我们的日志系统,但是它仍然有一定的限制,它没有办法基于多个条件路由消息。 我们可能不仅仅希望基于日志级别

[转帖]RabbitMQ学习笔记07:RPC

https://www.cnblogs.com/alongdidi/p/rabbitmq_tutorial_six.html 参考资料:RabbitMQ tutorial - Remote procedure call (RPC) — RabbitMQ Remote Procedure Call W

[转帖]RabbitMQ:Exchange的Fanout类型的介绍和使用

https://blog.csdn.net/weixin_45492007/article/details/106095591 1.声明 当前的内容用于本人学习和使用Fanout类型的Exchange,主要理解其主要作用 2.Fanout Exchange的官方介绍 扇出交换机将消息路由到与其绑定的

[转帖]RabbitMQ服务优化,修改最大连接数

https://www.cnblogs.com/hoyeong/p/16242202.html RabbitMQ的优化RabbitMQ的连接数是压垮消息队列的一个重要的指标。所以在平时使用OpenStack平台的过程中,如果大量的用户同时创建虚拟机,会导致云平台创建报错,其实就是消息队列服务的崩溃。

[转帖]RabbitMQ性能优化

https://www.cnblogs.com/zhengchunyuan/p/9253728.html 修改rabbitmq.config文件 rabbitmq.config文件时rabbitmq的配置文件,他遵守Erlang配置文件定义。 rabbitmq.config文件位置: Unix $R