参考资料:RabbitMQ tutorial - Remote procedure call (RPC) — RabbitMQ
Remote Procedure Call
What this tutorial focuses on
在 RabbitMQ学习笔记03:Work Queues 中我们学会了如何使用work queue
在多个worker
进程中分发耗时的任务。
但是如果我们需要在远程的电脑上执行一个函数并等待其结果呢?那就是完全不同的情况了。这种模式我们称之为远程过程调用 Remote Procedure Call
,简称RPC
。
在本博文中,我们将会使用RabbitMQ
去构建一个RPC
系统:一个客户端和一个可扩展的服务器。实际上由于我们并没有任何值得分发的耗时任务,因此我们将会创建一个假的RPC
服务用于返回斐波那契数列。
Client interface
为了描述一个RPC
服务是如何被使用的,我们将会创建一个简单的客户端类。它会暴露一个名叫call
的方法,该方法会发送一个RPC
请求并阻塞直到收到回答:
fibonacci_rpc = FibonacciRpcClient()
result = fibonacci_rpc.call(4)
print("fib(4) is %r" % result)
A note on RPC
尽管RPC
在计算机技术中是一种非常常见的模型,但是它经常备受批评。当一个程序员没有办法知道是否函数的调用是本地的或者如果它是一个慢RPC
,那么问题就会产生。这样的困惑会导致系统变得不稳定以及增加非必要的排错复杂度。滥用RPC不仅不会简化软件,反而会导致出现无法维护的面条式代码 spaghetti code
。
永远将以下内容记在心里:
- 哪些函数调用是本地的,哪些函数调用是远程的,这些一定要确保是清晰的。
- 文档要做好,确保不同的组件之间的依赖关系是清晰的。
- 处理错误案例。当一个
RPC
服务器挂了很长一段时间以后,客户端会是什么样的反应?
如果不确定自己是否要使用RPC
。如果可以的话,可以尝试使用异步管道,结果会被异步地推送到下一个计算阶段。
Callback queue
一般来说,通过RabbitMQ
来实现RPC
是简单的。一个客户端发送请求消息,然后一个服务器回复响应消息。为了接收响应,客户端需要在请求的时候发出callback
队列的地址。
result = channel.queue_declare(queue='', exclusive=True)
callback_queue = result.method.queue
channel.basic_publish(exchange='',
routing_key='rpc_queue',
properties=pika.BasicProperties(
reply_to = callback_queue,
),
body=request)
# ... and some code to read a response message from the callback_queue ...
Message properties
AMQP 0-9-1 预先定义了14种消息属性可以伴随消息一起发送出去。大多数属性很少使用到,除了以下即可:
- delivery_mode: 标记一个消息是永久或者临时存在的。我们在消息的持久化时有使用到。
- content_type: 用于描述编码时的MIME类型。比如常见的JSON编码
application/json
。 - reply_to: 一般用于命名一个
callback
队列。 - correlation_id: 用于关联
RPC
的请求和响应。
Correlation id
在上面列出的方法种我们建议每次RPC
请求都创建一个callback
队列。这样是非常低效的,更好的办法是基于客户端来创建callback
队列而不是基于每次RPC
请求。
这会带来一个新的问题,在callback
队列中接收到的响应消息,我们不知道对应的是哪条请求消息。因此我们需要使用到correlation_id
属性,我们会针对每条消息单独设一个该属性的唯一的取值,然后当我们在回调队列中收到消息的时候,我们会关注消息中的这个属性的值,如果correlation_id
的值相同就表示它们匹配的是同一条消息的请求和响应。如果我们发现correlation_id
属性的值是我们所不知道的,那么它就不属于我们的请求,我们就可以安全丢弃它们。
你可能会问,为什么我们要丢弃回调队列中未知的消息而不是报错?这是因为在服务器端可能会出现系统错乱的情况(race condition)。尽管可能性很小,存在一种可能在RPC
服务器发送给我们回答之后至在发送请求的确认消息之前这段时间,RPC
刚好宕机了。这种情况下,当RPC
服务器重启之后,它会重新处理一次请求。这就是为什么在客户端我们必须优雅地处理重复的响应,而且RPC
理想情况下应该是幂等的。
Summary
我们的RPC
的工作流程:
- 当客户端启动的时候,它会创建一个匿名的独有的回调队列。
- 对于一次
RPC
请求,客户端会发送一条消息并伴随2个消息属性。reply_to
用于指向回调队列的地址,correlation_id
用于标志每次请求消息。 - 请求会被发往
rpc_queue
队列。 RPC
的服务器端会作为消费者
在该队列上等待请求(消息)。一旦收到请求就会对其处理,然后将返回的消息发往reply_to
所指向的回调对类中。返回的消息会包含correlation_id
,其值和请求时保持一致。- 客户端将会在回调队列上等待数据。如果有新的数据抵达,并且
correlation_id
和之前请求时发出的消息中的correlation_id
值一致的话,就会处理这条响应消息;否则若不一致,则丢弃消息。
有意思的一点,在RPC
的C/S架构中,客户端和服务器端均作为消息队列的生产者和消费者。
Putting it all together
rpc_server.py
#!/usr/bin/env python
import pika
connection = pika.BlockingConnection(
pika.ConnectionParameters(host='localhost'))
channel = connection.channel()
channel.queue_declare(queue='rpc_queue')
def fib(n):
if n == 0:
return 0
elif n == 1:
return 1
else:
return fib(n - 1) + fib(n - 2)
def on_request(ch, method, props, body):
n = int(body)
print(" [.] fib(%s)" % n)
response = fib(n)
ch.basic_publish(exchange='',
routing_key=props.reply_to,
properties=pika.BasicProperties(correlation_id = \
props.correlation_id),
body=str(response))
ch.basic_ack(delivery_tag=method.delivery_tag)
channel.basic_qos(prefetch_count=1)
channel.basic_consume(queue='rpc_queue', on_message_callback=on_request)
print(" [x] Awaiting RPC requests")
channel.start_consuming()
服务器端的代码很直接:
- 像之前一样,我们建立connection、channel,并创建名为
rpc_queue
的队列。 - 我们声明了一个斐波那契数列函数,它只能接受正整数。我们不要传递太大的数,否则程序会变得很慢。
- 我们定义了回调函数
on_request
给basic_consume
使用,这是RPC
服务器的核心部分。当rpc_queue
队列中有消息的时候它就会被执行,用于发送响应给客户端。 - 在服务器端我们可能会想要运行多个
rpc_server.py
进程,为了让空闲的进程可以立即处理请求,我们使用了prefetch_count
。
rpc_client.py
#!/usr/bin/env python
import pika
import uuid
class FibonacciRpcClient(object):
def __init__(self):
self.connection = pika.BlockingConnection(
pika.ConnectionParameters(host='localhost'))
self.channel = self.connection.channel()
result = self.channel.queue_declare(queue='', exclusive=True)
self.callback_queue = result.method.queue
self.channel.basic_consume(
queue=self.callback_queue,
on_message_callback=self.on_response,
auto_ack=True)
self.response = None
self.corr_id = None
def on_response(self, ch, method, props, body):
if self.corr_id == props.correlation_id:
self.response = body
def call(self, n):
self.response = None
self.corr_id = str(uuid.uuid4())
self.channel.basic_publish(
exchange='',
routing_key='rpc_queue',
properties=pika.BasicProperties(
reply_to=self.callback_queue,
correlation_id=self.corr_id,
),
body=str(n))
self.connection.process_data_events(time_limit=None)
return int(self.response)
fibonacci_rpc = FibonacciRpcClient()
print(" [x] Requesting fib(30)")
response = fibonacci_rpc.call(30)
print(" [.] Got %r" % response)
客户端的代码则会稍微有点复杂:
- 建立connection、channel,并创建名为随机独有的回调队列,将该队列的名字保存到
callback_queue
。 - 我们订阅
callback_queue
,这样我们就可以接收RPC响应。 - 每次有响应消息回来的时候,
on_response
回调函数就会被执行,完成一个很简单的工作,对于每条响应的消息它会检查是否correlation_id
是我们在找的那个。如果是的话,它会将响应保存在self.response
并且离开消费循环。 - 接下来我们定义我们的主方法
call
——它来执行实际的RPC
请求。 - 在
call
方法中我们生成了correlation_id
并保存到self.corr_id
中。on_response
回调函数会基于这个值获取合适的响应。 - 同样在
call
方法中发布消息的时候,我们包含了2个属性reply_to
和correlation_id
。 - 最后我们等待合适的响应到达,然后将该消息返回给用户。
接下来就可以执行代码了,打开第一个终端运行python rpc_server.py
,打开第二个终端运行python rpc_client.py
。
两个终端最终结果如下
# 第一个终端
[root@rabbitmq-01 code]# python rpc_server.py
[x] Awaiting RPC requests
[.] fib(30)
# 第二个终端
[root@rabbitmq-01 code]# python rpc_client.py
[x] Requesting fib(30)
[.] Got 832040
代码中所呈现出来的设计不是唯一可能的RPC
服务的实现,但是它有一些重要的优点:
- 如果
RPC
服务太慢的话我们可以通过横向扩展的方式,在新的console窗口中再运行一个rpc_server.py
进程。 - 在客户端方面,
RPC
规定仅发送和接收1条消息。没有要求像queue_declare
这样的异步调用。这样的结果对于一次RPC
请求,RPC
客户端只需要一次网络来回(network round trip)。
这里的代码只是示例代码而已,过度简化了,有一些复杂且重要的问题没有解决,比如说:
- 如果没有server端运行的话,那么客户端会是什么反应?
- 客户端是否应该针对RPC设置超时时长?
- 如果服务器出现故障并且抛出了一个异常,那么这个异常是否应该被转发给客户端?
- 处理数据前是否判断消息的有效性,比如我们只允许接收100以内的数等等这类的边界检查机制。
最后,tutorial在这里提到了 management UI ,这是一个以Web(GUI)形式展示RabbitMQ
数据以及提供一些操作的插件,蛮适合不会或者不经常写代码操作RabbitMQ
的人员,比如运维工程师。但是像生产者和消费者这种,还是得通过实际的代码来实现的。由此也可以看出,很多开源软件的应用,想要入门并且学好、用好的话,其实运维和开发相关的知识是都需要掌握的。