摘要:本文围绕AMQP协议,为大家详细解析AMQP协议、核心技术亮点、多协议之间的对比以及使用实践。
本文分享自华为云社区《物联网常见协议之Amqp协议及使用场景解析》,作者:张俭。
本文围绕AMQP协议,为大家详细解析AMQP协议、核心技术亮点、多协议之间的对比以及使用实践,并介绍华为云IoT通过Amqp协议如何为开发者和企业提供了更加灵活和高效的通信方式,使得物联网应用得以在各个领域得到更广泛的推广和应用。
AMQP协议,全称为Advanced Message Queuing Protocol。在2006年6月,由Cisco、Redhat、iMatrix等联合制定了AMQP的消息标准。
除了AMQP协议,还有一些其他协议如Mqtt(Message Queuing Telemetry Transport)、Http、Kafka。每个协议的发明/出现都是为了解决特定的问题。没有最合适的协议,只有更合适的业务场景。在后面我们也会对这些协议进行简单的对比。
Amqp历史上大概有如下四个版本,
我们也会主要讨论Amqp 0-9-1 和 Amqp 1.0这两个版本
Amqp 0-9-1 协议是一个 多链路、协商的、异步、安全、可移植、高效的协议。Amqp协议通常分为两层:
+------------------Functional Layer----------------+ | Basic Transactions Exchanges Message queues | +--------------------------------------------------+ +------------------Transport Layer-----------------+ | Framing Content Data representation | | Error handling Heart-beating Channels | +--------------------------------------------------+
此外,由于Amqp协议的message queue支持许多特性:私有或共享、持久化或临时等等。根据不同的属性设定,我们可将AMQP用于许多应用场景,例如
Amqp 0-9-1的协议帧由 FrameHeader、Payload、FrameEnd组成
协议设计层面:
对称层面:
Amqp1.0 的协议帧由FrameHeader、ExtendedHeader、FrameBody组成。
Amqp帧类型代码为0x00。对于Amqp帧来说,FrameHeader的第6字节和第7字节表示channel的编号。Frame Body 被定义为一个 performative 后跟一个不透明的 payload。表现形式必须是第open、begin、attach、flow、transfer、disposition、detach、end、close中定义的一个,并在AMQP类型系统中编码为描述的类型。帧体中剩余的字节构成了该帧的 payload。payload 的存在和格式由给定表现形式的语义定义。
Sasl帧类型代码为0x01。FrameHeader中的第6和第7字节应该被忽略。也不存在扩展头。所以DOFF固定位0x02。
Amqp和Mqtt都是应用层的消息传递协议,mqtt更加轻量,相对来说概念不如amqp那么丰富,同时mqtt头部消息更加短小。更加适用于低带宽、功耗较低的物联网设备
AMQP是一种非常灵活的协议,可以用于各种类型的消息传递场景,包括点对点和发布-订阅模型。Kafka则专注于高吞吐量的流式处理,适用于数据管道和流式处理等场景。
Kafka的设计旨在提供高吞吐量和低延迟。AMQP的性能因实现和使用情况而异,但在大多数情况下,它的性能不如Kafka。
Kafka拥有强大的生态系统,包括流处理、数据湖、消息队列等多个应用场景。AMQP也有相应的生态系统和工具,但相对来说要小得多。
总得来说,尽管kafka存在性能上的优势,但kafka broker很难对外暴露。相较于kafka这种私有消息中间件协议,Amqp足够标准,更适合各种异构系统的对接。
提到AMQP,就不得不提rabbitmq。RabbitMQ 是一个开源的消息代理和队列服务器,用于通过高级消息队列协议(AMQP)在分布式系统中实现消息传递。RabbitMQ 提供了一个可靠、高性能、可扩展和易于使用的消息传递平台,支持多种编程语言和平台。它最初是用 Erlang 语言编写的,因此具有良好的并发性能和容错能力。
所谓成也erlang,败也erlang,由于erlang语言生态的问题,有能力深入维护Rabbitmq的人员并不是很多,也是rabbitmq越来越不流行的一个原因。
Apache Qpid(Quick Platform for Interactive Distributed Messaging)是一个开源的消息传递系统,它实现了高级消息队列协议(Advanced Message Queuing Protocol,AMQP)的多种版本。AMQP 是一种开放标准的应用层协议,用于消息传递的中间件,它可以实现跨平台、跨语言的消息通信。Qpid 项目的主要目标是提供一个可靠、可扩展和高性能的消息传递平台,帮助开发者更容易地构建分布式系统。主要的组件有
但Qpid总得来说,比较重型,如果仅仅是想在原有的消息组件,如kafka/pulsar外面叠加一层Amqp可访问的能力,我相信proton是更好的选择。
Vert.x Proton 的目标是结合 Vert.x 的响应式编程模型和 Qpid Proton 的 AMQP 支持,以简化构建高性能、可扩展的、基于 AMQP 的分布式应用程序。Vert.x Proton 提供了一套简洁、易用的 API,可以让开发者在 Vert.x 应用程序中轻松地实现 AMQP 通信。
在最初阶段华为云IoTDA主要支持HTTP协议,尽管这种方式已经能满足许多需求,但随着物联网技术的普及和发展,用户对于更加灵活和高效的通信方式的需求逐渐增强,华为云IoTDA逐渐丰富协议库,当前支持60+协议接入,为开发者和企业提供更加完善的解决方案。
在IoT应用对接场景中,华为云IoT现已新增了对AMQP的支持,与HTTP协议相比,AMQP协议具有以下优势
通过支持AMQP协议,华为云IoT为开发者和企业提供了更加灵活和高效的通信方式,使得物联网应用得以在各个领域得到更广泛的推广和应用。
首先通过pip 安装依赖包
pip install python-qpid-proton
最简单的消费者demo, consumer.py如下
import sys from proton.handlers import MessagingHandler from proton.reactor import Container class AMQPConsumer(MessagingHandler): def __init__(self, server_url, target_address): super(AMQPConsumer, self).__init__() self.server_url = server_url self.target_address = target_address def on_start(self, event): conn = event.container.connect(self.server_url) event.container.create_receiver(conn, self.target_address) def on_message(self, event): print(f"Received message: {event.message.body}") event.connection.close() if __name__ == "__main__": server_url = "amqp://localhost:5672" target_address = "example_queue" try: Container(AMQPConsumer(server_url, target_address)).run() except KeyboardInterrupt: sys.exit(0)
我们可以使用这个producer.py验证 consumer.py可用
import sys from proton import Message from proton.handlers import MessagingHandler from proton.reactor import Container class AMQPProducer(MessagingHandler): def __init__(self, server_url, target_address, message_body): super(AMQPProducer, self).__init__() self.server_url = server_url self.target_address = target_address self.message_body = message_body def on_start(self, event): conn = event.container.connect(self.server_url) self.sender = event.container.create_sender(conn, self.target_address) def on_sendable(self, event): message = Message(body=self.message_body) event.sender.send(message) print(f"Sent message: {message.body}") event.connection.close() if __name__ == "__main__": server_url = "amqp://localhost:5672" target_address = "example_queue" message_body = "Hello, AMQP 1.0!" try: Container(AMQPProducer(server_url, target_address, message_body)).run() except KeyboardInterrupt: sys.exit(0)
为了能连接上华为云IoTDA的Amqp接入点,我们还需要给consumer.py配置用户名、密码。如下为样例代码,具体连接的信息、凭据如何获得可参考: https://support.huaweicloud.com/devg-iothub/iot_01_00100_2.html。注意,url也从amqp修改为了amqps
import sys from proton import Message, SSLDomain from proton.handlers import MessagingHandler from proton.reactor import Container class AMQPConsumer(MessagingHandler): def __init__(self, server_url, target_address, username, password, cert_file, key_file): super(AMQPConsumer, self).__init__() self.server_url = server_url self.target_address = target_address self.username = username self.password = password self.cert_file = cert_file self.key_file = key_file def on_start(self, event): ssl_domain = SSLDomain(mode=SSLDomain.MODE_CLIENT) ssl_domain.set_credentials(self.cert_file, self.key_file, None) conn = event.container.connect(self.server_url, user=self.username, password=self.password, ssl_domain=ssl_domain) event.container.create_receiver(conn, self.target_address) def on_message(self, event): print(f"Received message: {event.message.body}") event.connection.close() if __name__ == "__main__": server_url = "amqps://localhost:5671" # 注意 'amqps',它表示使用 SSL/TLS 连接 target_address = "example_queue" username = "your_username" password = "your_password" cert_file = "path/to/your/certificate.pem" key_file = "path/to/your/private_key.pem" try: Container(AMQPConsumer(server_url, target_address, username, password, cert_file, key_file)).run() except KeyboardInterrupt: sys.exit(0)
该样例代码已上传到gitee
总体来说,AMQP作为一种应用层协议,在消息传递和异构系统之间的通信方面提供了非常灵活和可靠的解决方案。与其他消息传递协议相比,AMQP具有丰富的功能和灵活的设计,适用于各种类型的消息传递场景。
在使用AMQP时,我们可以选择现有的开源实现,如RabbitMQ、Qpid等,也可以自行实现AMQP的相关组件。通过这些实现,我们可以轻松地在不同的应用程序、语言和平台之间进行消息传递,并实现可靠、高效、安全的通信。
随着物联网、云计算和大数据等技术的发展,AMQP的应用场景越来越广泛,比如在IoT设备、大数据流处理、分布式系统等方面都得到了广泛应用。未来,AMQP将继续发挥重要作用,推动各种异构系统之间的互联互通,带来更加便捷和高效的消息传递体验。