物联网常见协议之Amqp协议及使用场景解析

联网,常见,协议,amqp,使用,场景,解析 · 浏览次数 : 45

小编点评

## Huawei Amqp 消息传递介绍 **Amqp**是一种应用层协议,在消息传递和异构系统之间的通信方面提供了非常灵活和可靠的解决方案。 **与其他消息传递协议相比,Amqp具有丰富的功能和灵活的设计,适用于各种类型的消息传递场景。** **以下是Amqp的一些主要功能:** * 支持点对点和发布订阅模式 * 提供消息队列功能 * 丰富的功能和灵活的设计 **Amqp的使用场景包括:** * IoT设备 * 大数据流处理 *分布式系统 *等等 **使用Amqp时需要带简单的排版:** ```python import sys from proton import Message, SSLDomainfrom proton.handlers import MessagingHandlerfrom proton.reactor import Containerclass AMQPConsumer(MessagingHandler): def __init__(self, server_url, target_address): super(AMQPConsumer, self).__init__() self.server_url = server_url self.target_address = target_address def on_start(self, event): ssl_domain = SSLDomain(mode=SSLDomain.MODE_CLIENT) ssl_domain.set_credentials(self.cert_file, self.key_file, None) conn = event.container.connect(self.server_url, user=self.username, password=self.password, ssl_domain=ssl_domain) event.container.create_receiver(conn, self.target_address) def on_message(self, event): print(f\"Received message: {event.message.body}\") event.connection.close()if __name__ == \"__main__\": server_url = \"amqps://localhost:5671\" # 注意 'amqps',它表示使用 SSL/TLS 连接 target_address = \"example_queue\" username = "your_username" password = "your_password" cert_file = "path/to/your/certificate.pem" key_file = "path/to/your/private_key.pem" try: Container(AMQPConsumer(server_url, target_address, username, password, cert_file, key_file)).run() except KeyboardInterrupt: sys.exit(0) ``` **其他信息:** * AMQP是应用层协议,需要在客户端应用程序中实现。 * 常见实现方案包括 RabbitMQ、Qpid 等。 * AMQP支持多种消息传递模式,例如点对点、发布订阅等。 * AMQP提供消息队列功能,可以实现消息传递的延迟。

正文

摘要:本文围绕AMQP协议,为大家详细解析AMQP协议、核心技术亮点、多协议之间的对比以及使用实践。

本文分享自华为云社区《物联网常见协议之Amqp协议及使用场景解析》,作者:张俭。

引言

本文围绕AMQP协议,为大家详细解析AMQP协议、核心技术亮点、多协议之间的对比以及使用实践,并介绍华为云IoT通过Amqp协议如何为开发者和企业提供了更加灵活和高效的通信方式,使得物联网应用得以在各个领域得到更广泛的推广和应用。

AMQP协议,全称为Advanced Message Queuing Protocol。在2006年6月,由Cisco、Redhat、iMatrix等联合制定了AMQP的消息标准。

除了AMQP协议,还有一些其他协议如Mqtt(Message Queuing Telemetry Transport)、Http、Kafka。每个协议的发明/出现都是为了解决特定的问题。没有最合适的协议,只有更合适的业务场景。在后面我们也会对这些协议进行简单的对比。

Amqp历史上大概有如下四个版本,

  • Amqp 0-8: 发布于2006年
  • Amqp 0-9-1:发布于2008年,是Amqp 0-8的改进版,被广泛应用,如rabbitmq、qpid等
  • Amqp 0-10:发布于2008年,是Amqp 0-9-1的改进,未被广泛使用
  • Amqp 1.0:发布于2011年,是Amqp协议的下一代标准,与之前的版本不兼容,但提供了更强大的特性和更好的性能。目前也在华为云IoT、Azure中有应用起来。包括rabbitmq、qpid等也提供了对Amqp1.0版本协议的支持

我们也会主要讨论Amqp 0-9-1 和 Amqp 1.0这两个版本

Amqp 0-9-1 协议简述

核心概念

  • Virtual Host:简称vhost,个人理解是Amqp协议上的多租,每个vhost具有自己的Exchanges、Message Queues等,互相不干扰。
  • Exchange: 从生产者应用程序中接收消息,并根据特定的情况(消息属性或内容),将这些消息路由到“Message Queue”中
  • Message Queue: 消息队列,存储消息,直到它被消费者应用程序安全地处理
  • Binding:指Exchange将何种类型的消息发送到Queue中,提供消息路由机制

Amqp 0-9-1 协议是一个 多链路、协商的、异步、安全、可移植、高效的协议。Amqp协议通常分为两层:

+------------------Functional Layer----------------+
| Basic Transactions Exchanges Message queues     |
+--------------------------------------------------+
+------------------Transport Layer-----------------+
| Framing Content Data representation             |
| Error handling Heart-beating Channels |
+--------------------------------------------------+

此外,由于Amqp协议的message queue支持许多特性:私有或共享、持久化或临时等等。根据不同的属性设定,我们可将AMQP用于许多应用场景,例如

  • 消息中间件使用:共享的存储转发队列,保存消息并交给多个消费者消费
  • RPC使用:通过将队列设定为临时的,带有IP地址的,来模拟RPC接口

Amqp 0-9-1 生产时序图

Amqp 0-9-1 消费时序图

Amqp 0-9-1 协议帧及数据类型

Amqp 0-9-1的协议帧由 FrameHeader、Payload、FrameEnd组成

  • Integers 整数(1到8个字节):用于表示大小、数量、限制等。整数总是无符号的,
  • Bits 位:用于表示开/关值,一个八位字节。
  • Short strings 短字符串:用于存储短文本属性。短字符串长度限制为255个八位字节。
  • Long strings 长字符串:用于存储二进制数据块。
  • Field tables 字段表:存储名称-值对。字段值可以是字符串、整数等类型。

Amqp 1-0协议

与Amqp 0-9-1的差异

协议设计层面:

  • AMQP 0-9-1:此版本的 AMQP 主要针对代理的设计,涵盖了消息传递模型、代理行为和交互模式。0-9-1 版本的协议与代理的实现紧密耦合。
  • AMQP 1.0:此版本的 AMQP 更注重基于互操作性的通信协议,不依赖于特定的代理实现。AMQP 1.0 关注点在于在发送者和接收者之间传输消息,而不是代理的内部行为。
  • 比如像”Queue Declare”、“Queue Delete”、“Queue Query”这些在Amqp 0-9-1支持的命令,在Amqp1.0中都被移除,并假设这些功能会在更高层(broker)参加。

对称层面:

  • Amqp 0-9-1 是一个典型的客户端/服务器通信协议。
  • Amqp 1.0 则是一个对称的协议,任何一端都可以注册为sender或是receiver,并且从如下Amqp 0.9.1和1.0之间的时序图对比也可以看出来。Amqp 1.0是完全双工的协议。从某种程度或者说网络编程的角度来说,实现的难度更大。

Amqp 1-0 鉴权时序图

Amqp 1-0 生产时序图

Amqp 1-0 消费时序图

Amqp 1.0 协议帧介绍

Amqp1.0 的协议帧由FrameHeader、ExtendedHeader、FrameBody组成。

  • FrameHeader 8个字节大小,包含长度、类型信息等
  • Extended header 可变宽度区域
  • FrameBody 是一个可变宽度的字节序列,其格式取决于帧类型

FrameHeader介绍

  • Size: FrameHeader的第0~3个字节包含帧大小。无符号的32位整数,为FrameHeader、ExtendedHeader、FrameBody的总和大小。如果大小小于8字节,则格式错误
  • DOFF: FrameHeader的第4个字节,这表示帧内Body的位置。
  • Type: FrameHeader的第5个字节,类型代码表示帧的格式和目的。根据帧的类型,帧头中的后续字节可能会被不同地解释。类型代码0x00表示该帧是AMQP帧。类型代码0x01表示该帧是SASL帧等。

Amqp 帧介绍

Amqp帧类型代码为0x00。对于Amqp帧来说,FrameHeader的第6字节和第7字节表示channel的编号。Frame Body 被定义为一个 performative 后跟一个不透明的 payload。表现形式必须是第open、begin、attach、flow、transfer、disposition、detach、end、close中定义的一个,并在AMQP类型系统中编码为描述的类型。帧体中剩余的字节构成了该帧的 payload。payload 的存在和格式由给定表现形式的语义定义。

SASL 帧介绍

Sasl帧类型代码为0x01。FrameHeader中的第6和第7字节应该被忽略。也不存在扩展头。所以DOFF固定位0x02。

与其他消息通信协议间的对比

Amqp与Mqtt的对比

Amqp和Mqtt都是应用层的消息传递协议,mqtt更加轻量,相对来说概念不如amqp那么丰富,同时mqtt头部消息更加短小。更加适用于低带宽、功耗较低的物联网设备

Amqp与Kafka协议的对比

AMQP是一种非常灵活的协议,可以用于各种类型的消息传递场景,包括点对点和发布-订阅模型。Kafka则专注于高吞吐量的流式处理,适用于数据管道和流式处理等场景。

Kafka的设计旨在提供高吞吐量和低延迟。AMQP的性能因实现和使用情况而异,但在大多数情况下,它的性能不如Kafka。

Kafka拥有强大的生态系统,包括流处理、数据湖、消息队列等多个应用场景。AMQP也有相应的生态系统和工具,但相对来说要小得多。

总得来说,尽管kafka存在性能上的优势,但kafka broker很难对外暴露。相较于kafka这种私有消息中间件协议,Amqp足够标准,更适合各种异构系统的对接。

AMQP协议相关的开源项目

rabbitmq

提到AMQP,就不得不提rabbitmq。RabbitMQ 是一个开源的消息代理和队列服务器,用于通过高级消息队列协议(AMQP)在分布式系统中实现消息传递。RabbitMQ 提供了一个可靠、高性能、可扩展和易于使用的消息传递平台,支持多种编程语言和平台。它最初是用 Erlang 语言编写的,因此具有良好的并发性能和容错能力。

所谓成也erlang,败也erlang,由于erlang语言生态的问题,有能力深入维护Rabbitmq的人员并不是很多,也是rabbitmq越来越不流行的一个原因。

Qpid

Apache Qpid(Quick Platform for Interactive Distributed Messaging)是一个开源的消息传递系统,它实现了高级消息队列协议(Advanced Message Queuing Protocol,AMQP)的多种版本。AMQP 是一种开放标准的应用层协议,用于消息传递的中间件,它可以实现跨平台、跨语言的消息通信。Qpid 项目的主要目标是提供一个可靠、可扩展和高性能的消息传递平台,帮助开发者更容易地构建分布式系统。主要的组件有

  • Qpid Broker:一个高性能、可扩展的AMQP消息代理,支持持久化、事务和安全认证等特性。Qpid Broker 提供了Java和C++两种实现。
  • Qpid Proton:一个轻量级的AMQP库,旨在为各种编程语言提供高性能的AMQP实现,提供了C和java的默认实现。此外Proton 还提供了其他编程语言如python的绑定。

但Qpid总得来说,比较重型,如果仅仅是想在原有的消息组件,如kafka/pulsar外面叠加一层Amqp可访问的能力,我相信proton是更好的选择。

Vertx-proton

Vert.x Proton 的目标是结合 Vert.x 的响应式编程模型和 Qpid Proton 的 AMQP 支持,以简化构建高性能、可扩展的、基于 AMQP 的分布式应用程序。Vert.x Proton 提供了一套简洁、易用的 API,可以让开发者在 Vert.x 应用程序中轻松地实现 AMQP 通信。

华为云IoT对AMQP的支持

在最初阶段华为云IoTDA主要支持HTTP协议,尽管这种方式已经能满足许多需求,但随着物联网技术的普及和发展,用户对于更加灵活和高效的通信方式的需求逐渐增强,华为云IoTDA逐渐丰富协议库,当前支持60+协议接入,为开发者和企业提供更加完善的解决方案。

在IoT应用对接场景中,华为云IoT现已新增了对AMQP的支持,与HTTP协议相比,AMQP协议具有以下优势

  1. 无需HTTP服务器:AMQP协议无需开发者搭建HTTP服务器,降低了项目成本,简化了系统架构。可以部署在各种类型的设备,包括手机、平板、智能家居设备等,进一步拓宽了物联网应用的领域。
  2. 低延迟、高效率:AMQP协议采用二进制传输,降低了数据传输所需的带宽,提高了传输速度,降低了延迟。
  3. 强大的消息队列功能:AMQP协议具有优秀的消息队列功能,支持点对点和发布订阅模式,确保消息的可靠传输和顺序处理。

通过支持AMQP协议,华为云IoT为开发者和企业提供了更加灵活和高效的通信方式,使得物联网应用得以在各个领域得到更广泛的推广和应用。

Amqp实战:使用qpid-proton python 消费华为云IoTDA的Amqp消息

首先通过pip 安装依赖包

pip install python-qpid-proton

最简单的消费者demo, consumer.py如下

import sys
from proton.handlers import MessagingHandler
from proton.reactor import Container
class AMQPConsumer(MessagingHandler):
 def __init__(self, server_url, target_address):
 super(AMQPConsumer, self).__init__()
 self.server_url = server_url
 self.target_address = target_address
 def on_start(self, event):
        conn = event.container.connect(self.server_url)
 event.container.create_receiver(conn, self.target_address)
 def on_message(self, event):
 print(f"Received message: {event.message.body}")
 event.connection.close()
if __name__ == "__main__":
 server_url = "amqp://localhost:5672"
 target_address = "example_queue"
 try:
 Container(AMQPConsumer(server_url, target_address)).run()
 except KeyboardInterrupt:
 sys.exit(0)

我们可以使用这个producer.py验证 consumer.py可用

import sys
from proton import Message
from proton.handlers import MessagingHandler
from proton.reactor import Container
class AMQPProducer(MessagingHandler):
 def __init__(self, server_url, target_address, message_body):
 super(AMQPProducer, self).__init__()
 self.server_url = server_url
 self.target_address = target_address
 self.message_body = message_body
 def on_start(self, event):
        conn = event.container.connect(self.server_url)
 self.sender = event.container.create_sender(conn, self.target_address)
 def on_sendable(self, event):
        message = Message(body=self.message_body)
 event.sender.send(message)
 print(f"Sent message: {message.body}")
 event.connection.close()
if __name__ == "__main__":
 server_url = "amqp://localhost:5672"
 target_address = "example_queue"
 message_body = "Hello, AMQP 1.0!"
 try:
 Container(AMQPProducer(server_url, target_address, message_body)).run()
 except KeyboardInterrupt:
 sys.exit(0)

为了能连接上华为云IoTDA的Amqp接入点,我们还需要给consumer.py配置用户名、密码。如下为样例代码,具体连接的信息、凭据如何获得可参考: https://support.huaweicloud.com/devg-iothub/iot_01_00100_2.html。注意,url也从amqp修改为了amqps

import sys
from proton import Message, SSLDomain
from proton.handlers import MessagingHandler
from proton.reactor import Container
class AMQPConsumer(MessagingHandler):
 def __init__(self, server_url, target_address, username, password, cert_file, key_file):
 super(AMQPConsumer, self).__init__()
 self.server_url = server_url
 self.target_address = target_address
 self.username = username
 self.password = password
 self.cert_file = cert_file
 self.key_file = key_file
 def on_start(self, event):
 ssl_domain = SSLDomain(mode=SSLDomain.MODE_CLIENT)
 ssl_domain.set_credentials(self.cert_file, self.key_file, None)
        conn = event.container.connect(self.server_url, user=self.username, password=self.password, ssl_domain=ssl_domain)
 event.container.create_receiver(conn, self.target_address)
 def on_message(self, event):
 print(f"Received message: {event.message.body}")
 event.connection.close()
if __name__ == "__main__":
 server_url = "amqps://localhost:5671" # 注意 'amqps',它表示使用 SSL/TLS 连接
 target_address = "example_queue"
    username = "your_username"
    password = "your_password"
 cert_file = "path/to/your/certificate.pem"
 key_file = "path/to/your/private_key.pem"
 try:
 Container(AMQPConsumer(server_url, target_address, username, password, cert_file, key_file)).run()
 except KeyboardInterrupt:
 sys.exit(0)

该样例代码已上传到gitee

总结与展望

总体来说,AMQP作为一种应用层协议,在消息传递和异构系统之间的通信方面提供了非常灵活和可靠的解决方案。与其他消息传递协议相比,AMQP具有丰富的功能和灵活的设计,适用于各种类型的消息传递场景。

在使用AMQP时,我们可以选择现有的开源实现,如RabbitMQ、Qpid等,也可以自行实现AMQP的相关组件。通过这些实现,我们可以轻松地在不同的应用程序、语言和平台之间进行消息传递,并实现可靠、高效、安全的通信。

随着物联网、云计算和大数据等技术的发展,AMQP的应用场景越来越广泛,比如在IoT设备、大数据流处理、分布式系统等方面都得到了广泛应用。未来,AMQP将继续发挥重要作用,推动各种异构系统之间的互联互通,带来更加便捷和高效的消息传递体验。

 

点击关注,第一时间了解华为云新鲜技术~

与物联网常见协议之Amqp协议及使用场景解析相似的内容:

物联网常见协议之Amqp协议及使用场景解析

摘要:本文围绕AMQP协议,为大家详细解析AMQP协议、核心技术亮点、多协议之间的对比以及使用实践。 本文分享自华为云社区《物联网常见协议之Amqp协议及使用场景解析》,作者:张俭。 引言 本文围绕AMQP协议,为大家详细解析AMQP协议、核心技术亮点、多协议之间的对比以及使用实践,并介绍华为云Io

深入分析四层/七层网关

1 简要介绍 随着云计算、大数据和物联网技术的迅猛发展,网络通信的复杂性和需求日益增加。在这种背景下,网关技术作为网络通信中的重要组成部分,扮演着关键的角色。 作为连接不同网络或协议的桥梁,四层网关和七层网关是两种常见且重要的类型。本文将对这两种网关进行深入分析和对比,让同学们更好地理解它们的工作原

【Nano Framework ESP32篇】WS2812 彩色灯带实验

地球人皆知,许多物联网教程作者的心中都深爱着一灯大师,所以第一个例程总喜欢点灯,高级一点的会来个“一闪一闪亮晶晶”。老周今天要扯的也是和灯有关的,但不单纯地点个灯,那样实在不好玩,缺乏乐趣。老周打算舞个龙灯,哦不,是用 LED 彩色灯带给伙伴们整点炫酷乐子。 说到这LED彩灯,咱们常见到的有两类:

物联网浏览器(IoTBrowser)-基于计算机视觉开发的应用“智慧眼AIEye”

一、起因 最近毕业在家:),准备筹划社区运营和IoTBrowser升级的事务,遇到了一系列物业管理上的问题,本来出于好心提醒物业人员,结果反被误认为是打广告推销的,当时被激怒一下,后面一想也许这也是一个普遍存在的问题,正好IoTBrowser缺少落地的应用场景,遂又撸起袖子搞了一个AI工具。以下是本

物联网 IOT 设备如何脱离信息孤岛?

目前在家庭物联网这一块,绝大部分的电子消费品都是基于wifi联网的设备。从商家那里达到消费者手中之后,简单开机使用无法体现其全部价值,还是需要经过消费者给设备配网的过程,把设备从信息孤岛接入互联互通的世界。

如何为物联网设备注入“华为云+鸿蒙DNA”?

大量物联网设备需要新的操作系统来支撑,这是鸿蒙发力的最佳机会,物联网迎来新的机遇与挑战。

第四届物联网与机器学习国际学术会议(IoTML 2024)

2024年第四届物联网与机器学习国际学术会议(IoTML 2024)将于2024年8月9-11日在中国南昌召开。会议将围绕着物联网和机器学习开展。

阿里云物联网平台专用工具详细说明

阿里云物联网平台专用工具基本涵盖了阿里云物联网平台提供你主要管理功能,可以方便创建产品、设备、物模型,查看设备实时属性,事件,发送服务和查看服务日志等等

阿里云物联网平台设备模拟器

在使用阿里云物联网平台过程中,如果开始调试没有实际的物理设备,可以考虑在阿里云物联网平台使用官方自带的模拟器进行调试。不过也可以通过叶帆科技开发的阿里云物联网平台设备模拟器AliIoTSimulator进行调试,AliIoTSimulator可以独立运行(需要单独加载物模型配置信息),也可以由阿里云物联网平台专用工具(AliIoTTools)直接启动。

老板要的物联网可视化大屏,我30分钟就搞定了

摘要:不知道大家在生活中有没有见过一些非常酷炫的可视化大屏应用? 本文分享自华为云社区《老板要的物联网可视化大屏,我30分钟就搞定了》,作者:华为云社区精选 。 不知道大家在生活中有没有见过一些非常酷炫的可视化大屏应用? 随着数字化经济的发展,我们对数据的呈现形式要求也越来越高,很多老板动不动就让我