此前,我在做跨语言调用时,用的是 Facebook 的 Thrift,挺轻量的,还不错。
Thrift是一种接口描述语言和二进制通讯协议,它被用来定义和创建跨语言的服务。它被当作一个远程过程调用(RPC)框架来使用,是由Facebook为“大规模跨语言服务开发”而开发的。它通过一个代码生成引擎联合了一个软件栈,来创建不同程度的、无缝的跨平台高效服务,可以使用C#、C++(基于POSIX兼容系统)Cappuccino、Cocoa、Delphi、Erlang、Go、Haskell、Java、Node.js、OCaml、Perl、PHP、Python、Ruby和Smalltalk编程语言开发。 2007由Facebook开源,2008年5月进入Apache孵化器, 2010年10月成为Apache的顶级项目。
最近的项目中也有类似的需求,这次打算使用 Google 的 gRPC,原因是 gRPC 知名度也很高,之前一直想用但没有场景,加上 AspNetCore 的文档里有介绍,看起来官方也是推荐使用这种方式进行 RPC 调用。
所以就果断开始了 gRPC 实践,最终做出来的效果还是可以的(这是后话)。
本文主要介绍 C# 与 Python 之间,使用 gRPC 进行跨语言调用。
远程过程调用(Remote Procedure Call,缩写为 RPC)是一个计算机通信协议。该协议允许运行于一台计算机的程序调用另一台计算机的子程序,而程序员无需额外地为这个交互作用编程。 比如 Java RMI(远程方法调用(Remote
Method Invocation)。能够让在某个java虚拟机上的对象像调用本地对象一样调用另一个java虚拟机中的对象上的方法)。
从上图可以看出, RPC 本身是 client-server模型,也是一种 request-response 协议。
有些实现扩展了远程调用的模型,实现了双向的服务调用,但是不管怎样,调用过程还是由一个客户端发起,服务器端提供响应,基本模型没有变化。
服务的调用过程为:
- client调用client stub,这是一次本地过程调用
- client stub将参数打包成一个消息,然后发送这个消息。打包过程也叫做 marshalling
- client所在的系统将消息发送给server
- server的的系统将收到的包传给server stub
- server stub解包得到参数。 解包也被称作 unmarshalling
- 最后server stub调用服务过程. 返回结果按照相反的步骤传给client
gRPC 是一种与语言无关的高性能远程过程调用 (RPC) 框架。
前文有说过,这个是 Google 开发的,以下是 AspNetCore 的文档中,对 gRPC 的介绍。
gRPC 的主要优点是:
- 现代高性能轻量级 RPC 框架。
- 协定优先 API 开发,默认使用协议缓冲区,允许与语言无关的实现。
- 可用于多种语言的工具,以生成强类型服务器和客户端。
- 支持客户端、服务器和双向流式处理调用。
- 使用 Protobuf 二进制序列化减少对网络的使用。
这些优点使 gRPC 适用于:
- 效率至关重要的轻量级微服务。
- 需要多种语言用于开发的 Polyglot 系统。
- 需要处理流式处理请求或响应的点对点实时服务。
gRPC 使用 .proto
文件来定义接口和数据类型,同时提供了通过 .proto
文件生成不同语言调用代码的工具。
本文的项目是使用 C# 调用 Python 写的服务,这个服务是一个大语言模型。
在本项目中,C# 项目作为客户端,Python 项目作为服务端。
.proto
文件第一步,要先编写用于定义接口和数据结构的 .proto
文件。
将下面代码保存到 chat.proto
文件里,接下来在要接入 gRPC 的每个项目里,都要复制一份这个文件。
syntax = "proto3";
import "google/protobuf/wrappers.proto";
option csharp_namespace = "AIHub.Blazor";
package aihub;
service ChatHub {
rpc Chat (ChatRequest) returns (ChatReply);
rpc StreamingChat (ChatRequest) returns (stream ChatReply);
}
message ChatRequest {
string prompt = 1;
repeated google.protobuf.StringValue history = 2;
int32 max_length = 3;
float top_p = 4;
float temperature = 5;
}
message ChatReply {
string response = 1;
}
这里定义了两个 Chat 方法,其中一个是一元调用,一个是流式输出。
gRPC 服务可以有不同类型的方法,服务发送和接收消息的方式取决于所定义的方法的类型
都挺好理解的,在 proto
定义里面也很易懂,stream
放在哪里,哪里就是流式。
stream
放在请求参数,那客户端输入就是流式的,放在返回值前面,那服务端的返回就是流式。都放就代表双向流式。
接着又定义了用到的数据结构,一个输入参数 ChatRequest
,一个返回参数 ChatReply
。
string prompt = 1;
这里赋值的意思是这个参数的位置,不是定义变量的赋值😂,第一次用差点整蒙了。
先在 Python 项目里,把 gRPC 服务器搭建起来。
把上面的 chat.proto
文件,复制到 Python 项目的目录里面。
Python 项目需要安装依赖
pip install grpcio
pip install grpcio-tools
执行命令生成代码
python -m grpc_tools.protoc -I . --python_out=. --pyi_out=. --grpc_python_out=. ./chat.proto
上面的命令,可以根据 proto
文件,生成以下代码
编写服务器代码
from concurrent import futures
import logging
import grpc
import chat_pb2
import chat_pb2_grpc
class Greeter(chat_pb2_grpc.ChatHubServicer):
def Chat(self, request: chat_pb2.ChatRequest, context):
response, history = model.chat(
tokenizer,
request.prompt,
history=[],
max_length=request.max_length,
top_p=request.top_p,
temperature=request.temperature)
return chat_pb2.ChatReply(response=response)
def StreamingChat(self, request: chat_pb2.ChatRequest, context):
past_key_values, history = None, []
current_length = 0
for response, history, past_key_values in model.stream_chat(
tokenizer, request.prompt, history=history,
past_key_values=past_key_values,
return_past_key_values=True):
print(response[current_length:], end="", flush=True)
yield chat_pb2.ChatReply(response=response)
current_length = len(response)
def serve():
port = '50051'
server = grpc.server(futures.ThreadPoolExecutor(max_workers=10))
chat_pb2_grpc.add_ChatHubServicer_to_server(Greeter(), server)
server.add_insecure_port('[::]:' + port)
server.start()
print("Server started, listening on " + port)
server.wait_for_termination()
if __name__ == '__main__':
logging.basicConfig()
serve()
前面在 proto
里定义的俩方法:Chat
, StreamingChat
要自己实现,这里我直接调用 Transformer ,一元方法就直接返回,流式输出使用 yield 生成结果。
本文把 Python 项目作为 gRPC 的服务端供 AspNetCore 项目调用
反过来也是没问题的
Python 作为客户端的代码如下
import logging
import grpc
import chat_pb2
import chat_pb2_grpc
def run():
print("Will try to greet world ...")
with grpc.insecure_channel('localhost:50051') as channel:
stub = chat_pb2_grpc.ChatHubStub(channel)
response = stub.Chat(chat_pb2.ChatRequest(prompt='你好'))
print("Greeter client received: " + response.message)
if __name__ == '__main__':
logging.basicConfig()
run()
AspNetCore 与 gRPC 的集成非常容易
先把上面的 proto
文件添加到项目中
在本项目中我放到 Protos/chat.proto
中
先安装 nuget 包
dotnet add Grpc.AspNetCore
然后编辑 .csproj
添加以下配置
<ItemGroup>
<Protobuf Include="Protos\chat.proto" GrpcServices="Both" />
</ItemGroup>
其中 GrpcServices="Both"
表示同时生成服务端和客户端代码
完成之后,Build 项目,然后就会在 obj\Debug\net6.0\Protos
目录下生成对应的代码
不过这些代码不需要直接引用,编译器会自动处理。
编辑 Program.cs
文件
注册一个 gRPC 客户端
builder.Services.AddGrpcClient<ChatHub.ChatHubClient>(options => {
options.Address = new Uri(builder.Configuration.GetValue<string>("gRPCServer"));
});
如果要注册多个相同的客户端,可以加个名字区分
builder.Services.AddGrpcClient<ChatHub.ChatHubClient>("client1", options => {
options.Address = new Uri(builder.Configuration.GetValue<string>("gRPCServer"));
});
builder.Services.AddGrpcClient<ChatHub.ChatHubClient>("client2", options => {
options.Address = new Uri(builder.Configuration.GetValue<string>("gRPCServer"));
});
使用的时候直接注入就完事了。
如果是命名客户端,就先注入 GrpcClientFactory
对象
var clinet1 = grpcClientFactory.CreateClient<ChatHub.ChatHubClient>("client1");
var clinet2 = grpcClientFactory.CreateClient<ChatHub.ChatHubClient>("client2");
发起一个请求(一元模式)
var request = new ChatRequest { Prompt = "哈喽" };
ChatReply reply = ChatClient.Chat(request);
读取流式返回数据
using (var call = ChatClient.StreamingChat(request)) {
await foreach (var resp in call.ResponseStream.ReadAllAsync()) {
Console.Write(resp.Response);
}
}
简简单单,搞定~
同样很简单
注册服务、配置中间件
builder.Services.AddGrpc();
app.MapGrpcService<ChatService>();
ChatService 代码
public class ChatService : ChatHub.ChatHubBase {
private readonly ILogger<ChatService> _logger;
public ChatService(ILogger<ChatService> logger) {
_logger = logger;
}
public override Task<ChatReply> Chat(ChatRequest request, ServerCallContext context) {
return new Task<ChatReply>(() => new ChatReply { Response = $"你好,{request.Prompt}" });
}
public override async Task StreamingChat(ChatRequest request, IServerStreamWriter<ChatReply> responseStream,
ServerCallContext context) {
for (var i = 0; i < 5; i++) {
await responseStream.WriteAsync(new ChatReply { Response = $"{i}" });
await Task.Delay(TimeSpan.FromSeconds(1));
}
}
}
同样是把 protobuf 里定义的俩方法实现了一遍。
代码比较易懂,跟上面的 Python 版差不多,这里就不重复解释了。
gRPC 使用起来非常的丝滑,目前来说也没遇到什么坑,可以非常平滑地与已有项目集成,如果有类似的场景,强烈推荐尝试一下 gRPC !
gRPC 的功能很多,本文仅介绍了最基本的使用,更多的请阅读文档,详细有了本文的基础铺垫,读者再阅读文档深入使用时,会比较轻松上手。
PS:最近这个使用到 gRPC 的项目,接下来我会写个小结博客介绍一下~