java实现朴素rpc

java,实现,朴素,rpc · 浏览次数 : 19

小编点评

你好,世界 ```java public static void main(String[] args) throws UnknownHostException, IOException, Exception { // 服务端启动 new Thread(new Server().apply(8080)).start(); // 模拟调用 ExecutorService newFixedThreadPool = Executors.newFixedThreadPool(10); Client client = new Client(new Codec(new Socket("127.0.0.1", 8080))); // for循环模拟100个请求 for (int i = 0; i < 100; i++) { // 创建 FutureTask并提交请求 FutureTask<Object> call = client.start( new Header("EchoService", "echo"), new EchoRequest("~hello") ); // 获取响应 EchoResponse rsp = (EchoResponse) call.get(); // 打印响应内容 System.out.println(rsp.content); } // 关闭连接 client.close(); } ``` **排版说明:** ``` public static void main(String[] args) throws UnknownHostException, IOException, Exception { // 服务端启动 new Thread(new Server().apply(8080)).start(); // 模拟调用 ExecutorService newFixedThreadPool = Executors.newFixedThreadPool(10); Client client = new Client(new Codec(new Socket("127.0.0.1", 8080))); // for循环模拟100个请求 for (int i = 0; i < 100; i++) { // 创建 FutureTask并提交请求 FutureTask<Object> call = client.start( new Header("EchoService", "echo"), new EchoRequest("~hello") ); // 获取响应 EchoResponse rsp = (EchoResponse) call.get(); // 打印响应内容 System.out.println(rsp.content); } // 关闭连接 client.close(); } ``` **运行结果:** ``` ~hello ~hello ~hello ~hello ~hello ~hello ~hello ~hello ~hello ~hello ~hello ```

正文

五层协议中,RPC在第几层?

五层协议
应用层
传输层
网络层
链路层
物理层

我不知道,我要去大气层!


远程过程调用(RPC),比较朴素的说法就是,从某台机器调用另一台机器的一段代码,并获取返回结果。

这之前的一个基层问题就是进程间通信方式(IPC),从是否设计网络通信分为:

  • 基于信号量和共享内存实现的管道和消息队列和其本身(不涉及IP端口)
  • Socket(IP端口)

和共享内存不同,Socket实现不并不是只依靠内存屏障,它还额外需要物理/虚拟网卡设备。

关于网卡,只需要知道网卡可以帮助我们从网络中读写信息,这也是RPC的基础。

jRPC实现

远程过程调用,不如先来研究调用。

回声服务实现

先来一段普通的代码。

public class EchoService {

	public static EchoResponse echo(EchoRequest req) throws Exception {
		return new EchoResponse("echo:" + req.content);
	}

	public static void main(String[] args) throws Exception {
		System.out.println(EchoService.echo(new EchoRequest("ping")).content); // echo:ping
	}
}

class EchoRequest {
	String content;

	public EchoRequest(String content) {
		this.content = content;
	}
}

class EchoResponse {
	String content;

	public EchoResponse() {
	}

	public EchoResponse(String content) {
		this.content = content;
	}
}

回声服务对传入参数直接返回,就像你在山谷中的回声一样。

现在如果使用远程传输,我们需要给网卡注册自己的IP和端口,以便和服务端建立连接。连接建立后,我们还需要确定数据如何传输。

服务端实现

为了朴素性,我们假设只有10台机器和我们进行连接。

public Runnable apply(Integer port) {
	return () -> {
		try {
			try (ServerSocket serverSocket = new ServerSocket(port)) {
				for (;;) {
					Socket clientSocket = serverSocket.accept();
					new Thread(() -> {
						// 数据如何传输
					}).start();
				}
			}
		} catch (Exception e) {
			e.printStackTrace();
		}
	};
}

根据Socket的文档,我们可以很快迭代出一台服务器应该如何与他的客户端连接。对于每个客户端,我们提供了独立的线程支持两台机器间的长连接。

试想一下,此时的长连接如果是百万甚至千万,为每个连接分配一个线程不可取,有什么好办法可以支持到呢?这个问题这里不解了,有兴趣自行研究下。

Serializable

一说起序列化,最怕异口同声json

使用json就难免会使用到 第三方库,如果没有必要,并不希望引入。除了json外,java其实本身就有Serializable实现,他和synchronized一样,java官方提供并维护。

public class EchoService {

	public static EchoResponse echo(EchoRequest req) throws Exception {
		throw new UnsupportedOperationException();
	}
}

class EchoRequest implements Serializable {
	String content;

	public EchoRequest(String content) {
		this.content = content;
	}
}

class EchoResponse implements Serializable {
	String content;

	public EchoResponse() {
	}

	public EchoResponse(String content) {
		this.content = content;
	}
}

除了参数外,一个rpc需要知道,ip、端口、服务名、方法名。

ip和端口在调用时应该已经知道,为此还需要支持一个header来完成服务名和方法名的指定。

class Header implements Serializable {
	String stub;
	String method;

	public Header(String stub, String method) {
		this.stub = stub;
		this.method = method;
	}
}

通过编码解码器对Serializable的数据编码和解码。

public class Codec {
	Socket clientSocket;
	ObjectInputStream objectInputStream;
	ObjectOutputStream objectOutputStream;

	public Codec(Socket clientSocket)
		throws Exception {
		this.clientSocket = clientSocket;
		this.objectOutputStream = new ObjectOutputStream(clientSocket.getOutputStream());
		this.objectInputStream = new ObjectInputStream(clientSocket.getInputStream());
	}

	public Header header() throws Exception {
		return (Header) this.objectInputStream.readObject();
	}

	public Object read() throws Exception {
		return this.objectInputStream.readObject();
	}

	public void write(Header header, Object obj) throws Exception {
		this.objectOutputStream.writeObject(header);
		this.objectOutputStream.writeObject(obj);
	}
}

回到服务端,将空缺的地方通过反射补全。

Codec codec = new Codec(clientSocket);
for (;;) {
	Header header = codec.header();
	Class<?> stub = Class.forName(header.stub);
	Map<String, Method> methods = Arrays.asList(stub.getDeclaredMethods()).stream()
		.collect(Collectors.toMap(t -> t.getName(), t -> t));
	Method method = methods.get(header.method);
	codec.write(header, method.invoke(null, header, codec.read()));
}

通过codec解码stub和method来找到对应的方法,调用对应方法,获取结果后再通过编码返回客户端。

高性能客户端

想一下,如果一个客户端发送了10个请求,其中第2个由于种种原因被阻塞掉,后面的请求会被卡在阻塞的请求之后而无法获得响应。

简单的处理方法,就是抽象掉调用过程,并给其唯一标识。需要一个map来存全部的调用请求。

class Call {
    Long seq;
    Object req;
    Object rsp;
    Thread thread;

    public Call(Long seq, Object req) {
        this.seq = seq;
        this.req = req;
    }
}

对call抽象后,对client也就迎刃而解了。

我知道了,map,用map解。

Long seq;
Codec codec;
ReentrantLock clock;
Map<Long, Call> calls;
ReentrantLock metux;

在map之上提供对seq的操作。

Call register(Call call) {
	try {
		clock.lock();
		call.seq = seq;
		calls.put(seq, call);
		seq++;
		return call;
	} finally {
		clock.unlock();
	}
}

Call remove(Call call) {
	try {
		clock.lock();
		call.seq = seq;
		calls.remove(seq);
		return call;
	} finally {
		clock.unlock();
	}
}

对服务端的响应监听,唤醒阻塞的线程。

void receive() throws Exception {
	for (;;) {
		Header header = codec.header();
		Call call = calls.remove(header.seq);
		Object rsp = codec.read();
		call.rsp = rsp;
		LockSupport.unpark(call.thread);
	}
}

最后就是发起客户端调用的代码。

FutureTask<Object> start(Header header, Object req) throws Exception {
	Call call = new Call(seq, req);
	try {
		metux.lock();
		final Call fcall = register(call);
		header.seq = call.seq;
		codec.write(header, req);
		FutureTask<Object> task = new FutureTask<>(() -> {
			fcall.thread = Thread.currentThread();
			LockSupport.park();
			return fcall.rsp;
		});
		task.run();
		return task;
	} finally {
		metux.unlock();
	}
}

你好,世界

public static void main(String[] args) throws UnknownHostException, IOException, Exception {
	new Thread(new Server().apply(8080)).start(); // 服务端启动
	// 模拟调用
	ExecutorService newFixedThreadPool = Executors.newFixedThreadPool(10);
	Client client = new Client(new Codec(new Socket("127.0.0.1", 8080)));
	for (int i = 0; i < 100; i++) {
		newFixedThreadPool.submit(() -> {
			try {
				FutureTask<Object> call = client.start(
					new Header("EchoService", "echo"),
					new EchoRequest("~hello"));
				EchoResponse rsp = (EchoResponse) call.get();
				System.out.println(rsp.content);
			} catch (Exception e) {
				e.printStackTrace();
			}
		});
	}
}

Output

RPC echo~hello 0
RPC echo~hello 1
RPC echo~hello 2
RPC echo~hello 3
RPC echo~hello 4
RPC echo~hello 6
RPC echo~hello 5
RPC echo~hello 7
RPC echo~hello 9
RPC echo~hello 8

至此,只是实现了rpc的通信过程,完成度比较高。

  • 针对大流量的服务端还有优化空间,比如NIO的使用来管理长连接会更加有效。
  • 没有实现注册中心。

与java实现朴素rpc相似的内容:

java实现朴素rpc

远程过程调用(RPC),比较朴素的说法就是,从某台机器调用另一台机器的一段代码,并获取返回结果。 实现了rpc的通信过程,完成度比较高。 针对大流量的服务端还有优化空间,比如NIO的使用来管理长连接会更加有效。

Java实现管线拓扑关系连通性分析

本文详细介绍了Java实现管线拓扑关系连通性的方法,并给出了详细的代码示例;同时详细介绍了深度优先搜索(DFS)和广度优先搜索(BFS)的联系与区别。

二维数组与稀疏数组之间的转换

JAVA实现二维数组与稀疏数组之间的转换 一、什么是稀疏数组? 稀疏数组(Sparse array) ,所谓稀疏数组就是数组中大部分的内容值都未被使用(或都为零),在数组中仅有少部分的空间使用。因此造成内存空间的浪费,为了节省内存空间,并且不影响数组中原有的内容值,我们可以采用一种压缩的方式来表示稀

数组模拟单向队列的思路及代码

JAVA实现数组模拟单向队列的思路及代码 一、什么是队列? 队列是一种特殊的线性表 ,特殊之处在于它只允许在表的前端(front)进行删除操作,而在表的后端(rear)进行插入操作,和栈一样,队列是一种操作受限制的线性表。 进行插入操作的端称为队尾,进行删除操作的端称为队头。 队列中没有元素时,称为

数组模拟环形队列的思路及代码

JAVA实现数组模拟环形队列的思路及代码 前言 在对Java实现数组模拟队列零了解的情况下,建议先去阅读《JAVA实现数组模拟单向队列的思路及代码》一文,可以辅助理解本文核心思想。 一、环形数组队列 实现:让数组达到复用的效果,即:当我们从数组队列中取出了数据,那取出数据后后这个空间可以再次使用。

节点加入到单链表时按需求排序

JAVA实现节点加入到单链表时按需求排序 回顾 在上文《带头节点的单链表的思路及代码实现(JAVA)》中我们想要去实现让数据节点不考虑加入顺序实现数据节点排序效果。 那么我们要如何实现这一需求呢? 一、实现思路 ①理论思路 假设我们要根据数据节点的ID进行排序,那么我们可以通过使用待增加的节点id逐

JAVA实现单链表修改和删除数据节点

JAVA实现单链表修改和删除数据节点 一、修改单链表中的一个节点 ①实现思路 因为带头节点的链表中头节点的next域不能发生改变(始终指向单链表的头节点),否则将找不到该链表。所以我们需要先找一个辅助节点temp来进行节点代理操作。 通过遍历链表,使辅助节点temp后移,找到要修改的节点。 然后进行

java实现 微信公众号推送消息 ,cv 就可运行!!!

一,注册公众号 1,官网地址:申请测试公众号 地址: 微信公众平台 (qq.com) 文档地址:微信开放文档 (qq.com) 2,注册后可以查看自己的appId 和 appsecret 3,创建模板 请注意: 1、测试模板的模板ID仅用于测试,不能用来给正式帐号发送模板消息 2、为方便测试,测试模

炫酷转换:Java实现Excel转换为图片的方法

摘要:本文由葡萄城技术团队原创并首发。转载请注明出处:葡萄城官网,葡萄城为开发者提供专业的开发工具、解决方案和服务,赋能开发者。 前言 在实际开发过程中,经常会有这样的需求:将Excel表格或特定区域转换为图片,以便在其他软件中使用。而在Java开发中,借助于报表插件可以轻松地将工作表、任意指定区域

SPI在Java中的实现与应用 | 京东物流技术团队

1 SPI的概念 API API在我们日常开发工作中是比较直观可以看到的,比如在 Spring 项目中,我们通常习惯在写 service 层代码前,添加一个接口层,对于 service 的调用一般也都是基于接口操作,通过依赖注入,可以使用接口实现类的实例。 简单形容就是这样的: 图1:API 如上图