网络五层协议中,试问 RPC 在第几层?
| 五层协议 |
|---|
| 应用层 |
| 传输层 |
| 网络层 |
| 链路层 |
| 物理层 |
进程协作完成任务时,通常需要相互通信。
本质上通信的方式分为本地和网络。
本地通信方式主要有两种形式,第一种是如命名管道基于操作系统暴漏的读写来操作,第二种是如锁机制的基于共享内存的线程队列阻塞机制(内存屏障)。
文章将要介绍一种网络通信方式,实现多台主机上进程相互通信的方式,是计算机网络的应用,是分布式的基础。
接口定义
rpc是远程过程调用,通过存根来进行数据定义,通过网络主机和端口建立传输层的连接和传。
首先先来看下存根,这个非常好理解,因为他其实就是空实现的本地方法调用。
public class EchoService {
public static EchoRsp EchoSvr(EchoReq req) {
return new EchoRsp("echo:" + req.content);
}
public static void main(String[] args) {
System.out.println(EchoService.EchoSvr(new EchoReq("hello")).content);
}
}
class EchoReq {
String content;
public EchoReq(String content) {
this.content = content;
}
}
class EchoRsp {
String content;
public EchoRsp(String content) {
this.content = content;
}
}
这是一个回声服务,将传入的数据简单处理后回传。
public static EchoRsp EchoSvr(EchoReq req) throws Exception {
throw new UnsupportedOperationException();
}
存根相当于客户端并没有回声服务的实现,他只是提供了客户端需要关心的请求和结果。
编码解码
之前说到我们已经可以拿掉请求和结果了,接下来需要实现数据的序列化方式,用于数据在流上的传输。
class EchoReq implements Serializable {
String content;
public EchoReq(String content) {
this.content = content;
}
}
class EchoRsp implements Serializable {
String content;
public EchoRsp(String content) {
this.content = content;
}
}
序列化方式使用了Serializable,是语言默认的一种序列化的方式,使用十分简单,只需要实现序列化的接口即可。
定义好序列化方式,还需要定义那些数据在流上传输。
public class Header implements Serializable {
String stub;
String method;
public Header(String stub, String method) {
this.stub = stub;
this.method = method;
}
}
Header 用于制定存根和调用的方法,因此需要在流上传输。除了header外,还应该有服务的请求requset和服务的响应response。
public class Codec {
InputStream inputStream;
OutputStream outputStream;
public Codec(InputStream inputStream, OutputStream outputStream) {
this.inputStream = inputStream;
this.outputStream = outputStream;
}
public Header Header() throws Exception {
return (Header) new ObjectInputStream(inputStream).readObject();
}
public Object Read() throws Exception {
return new ObjectInputStream(inputStream).readObject();
}
public void Write(Header header, Object obj) throws Exception {
new ObjectOutputStream(outputStream).writeObject(header);
new ObjectOutputStream(outputStream).writeObject(obj);
}
}
定义好传输哪些数据后,编码解码器就好实现了。
服务端
在 rpc 中服务端承载了监听本机端口,并与客户端建立连接并通信数据。
ServerSocket serverSocket = new ServerSocket(8080);
Socket socket = serverSocket.accept();
通过 socket 方式,很容易监听本机的端口,没有客户端连接时会阻塞到 accept。
Codec cc = new Codec(socket.getInputStream(), socket.getOutputStream());
Header header = cc.Header();
Class> stub = Class.forName(header.stub);
Map methods = Arrays.asList(stub.getDeclaredMethods()).stream()
.collect(Collectors.toMap(t -> t.getName(), t -> t));
Method method = methods.get(header.method);
cc.Write(header, method.invoke(null, header, cc.Read()));
服务端通过编码器可以读出客户端要调用的存根和方法,在通过反射的方式可以加载对应的类并进行调用。
现在已经实现了基本的服务端,但是这个服务端的吞吐量受制,需要对客户端的请求排队处理。
ExecutorService iopool = Executors.newFixedThreadPool(100);
Socket socket = serverSocket.accept();
iopool.submit(() -> {
// ...
});
对于网络请求这种密集io的应用,通常使用池化技术来处理,并发多个线程就可以成倍加快吞吐量。
客户端
大部分客户端都是支持水平扩容的,对客户端来说单机客户端池化加长连接几乎可以覆盖大部分场景。
class Call {
Long seq;
Object req;
Object rsp;
Thread thread;
public Call(Long seq, Object req) {
this.seq = seq;
this.req = req;
this.thread = Thread.currentThread();
}
}
对调用抽象,每个调用需要绑定线程并阻塞,在服务端处理后,会通过标识符回调通知,唤醒阻塞线程。
public class Client {
Long seq;
InputStream inputStream;
OutputStream outputStream;
ReentrantLock clock;
ReentrantLock metux;
Map calls;
public Client(InputStream inputStream, OutputStream outputStream) {
this.inputStream = inputStream;
this.outputStream = outputStream;
seq = 0L;
clock = new ReentrantLock();
metux = new ReentrantLock();
calls = new HashMap<>();
new Thread(() -> {
receive();
}).start();
}
}
客户端应该有标识,对共享资源有互斥锁,对流的读写也需要互斥执行。
FutureTask
客户端可以通过发送方法,发送数据到服务端,发送数据后将发送的数据的线程阻塞。
void receive() {
try {
for (;;) {
Header header = (Header) new ObjectInputStream(inputStream).readObject();
Call call = calls.remove(header.seq);
Object rsp = new ObjectInputStream(inputStream).readObject();
call.rsp = rsp;
LockSupport.unpark(call.thread);
}
} catch (Exception e) {
if (e instanceof SocketException) {
return;
}
e.printStackTrace();
}
}
每一个客户端都需要单独的线程去接受服务端回调,并唤醒发起请求时阻塞的线程。
Call register(Call call) {
try {
clock.lock();
call.seq = seq;
calls.put(seq, call);
seq++;
return call;
} finally {
clock.unlock();
}
}
Call remove(Call call) {
try {
clock.lock();
call.seq = seq;
calls.remove(seq);
return call;
} finally {
clock.unlock();
}
}
calls是一段连续的共享内存,对其实现注册和移除的方法。
多路复用
线程增加后,同时也增加处理机切换上下文的成本,并不是每个时刻都会有活动的网络通信,对每个连接单独起一个线程管理确实有点浪费。
ServerSocketChannel serverChannel = ServerSocketChannel.open();
ServerSocket serverSocket = serverChannel.socket();
serverSocket.bind(new InetSocketAddress(8080));
serverChannel.configureBlocking(false);
Selector selector = Selector.open();
serverChannel.register(selector, SelectionKey.OP_ACCEPT);
for (; selector.select(0) > 0;) {
Iterator it = selector.selectedKeys().iterator();
for (; it.hasNext();) {
SelectionKey key = it.next();
it.remove();
// ...
}
}
nio 的多路复用技术,允许只用一个线程,可以监听百万长连接。
if (key.isAcceptable()) {
ServerSocketChannel channel = (ServerSocketChannel) key.channel();
SocketChannel sChannel = channel.accept();
sChannel.configureBlocking(false);
sChannel.register(selector, SelectionKey.OP_READ);
}
连接存在两种状态,其中之一就是有连接请求建立,建立连接后,应该把这连接重新放到多路复用池中。
多路复用没有阻塞的读,这就要求我们必须考虑粘包类问题。
可以先研究下是否可以根据数据本身来做定界符。
[-84, -19, 0, 5, 115, 114, 0, 6, 72, 101, 97, 100, 101, 114, 4, 3, -46, 102, -12, -24, -29, -101, 2, 0, 3, 76, 0, 6, 109, 101, 116, 104, 111, 100, 116, 0, 18, 76, 106]
[97, 118, 97, 47, 108, 97, 110, 103, 47, 83, 116, 114, 105, 110, 103, 59, 76, 0, 3, 115, 101, 113, 116, 0, 16, 76, 106, 97, 118, 97, 47, 108, 97, 110, 103, 47, 76]
[111, 110, 103, 59, 76, 0, 4, 115, 116, 117, 98, 113, 0, 126, 0, 1, 120, 112, 116, 0, 4, 101, 99, 104, 111, 115, 114, 0, 14, 106, 97, 118, 97, 46, 108, 97, 110, 103]
[46, 76, 111, 110, 103, 59, -117, -28, -112, -52, -113, 35, -33, 2, 0, 1, 74, 0, 5, 118, 97, 108, 117, 101, 120, 114, 0, 16, 106, 97, 118, 97, 46, 108, 97, 110, 103]
[46, 78, 117, 109, 98, 101, 114, -122, -84, -107, 29, 11, -108, -32, -117, 2, 0, 0, 120, 112, 0, 0, 0, 0, 0, 0, 0, 0, 116, 0, 11, 69, 99, 104, 111, 83, 101, 114, 118]
[105, 99, 101, -84, -19, 0, 5, 115, 114, 0, 11, 69, 99, 104, 111, 82, 101, 113, 117, 101, 115, 116, 124, -113, 104, 7, 112, -104, 46, -37, 2, 0, 1, 76, 0, 7, 99, 111]
[110, 116, 101, 110, 116, 116, 0, 18, 76, 106, 97, 118, 97, 47, 108, 97, 110, 103, 47, 83, 116, 114, 105, 110, 103, 59, 120, 112, 116, 0, 6, 126, 104, 101, 108, 108]
这里是序列化的数据,对其解读下。
- [-84, -19]魔数,ObjectStreamConstants.STREAM_MAGIC。
- [0, 5] 魔数,ObjectStreamConstants.STREAM_VERSION。
- [115, 114] 魔数,ObjectStreamConstants.TC_CLASSDESC、ObjectStreamConstants.TC_OBJECT。
- [0, 6] Header的长度。
感兴趣可自己解读,结果是并没有很好的方式去定界。
不难想到,几个避免粘包的方法。
- 定界与转义:优点:侵入性比较小。缺点:需要转义原来的特殊字符。(TCP链路层、netty)
- 消息体长度:优点:开始和结束非常明确。缺点:侵入性比较大。(netty)
既然已经有侵入了,无所谓侵入大小,侵入的数据最好有明确的意义就更好。
class NonblockOpt implements Consumer
客户端改造,在写入真正的数据前,先写入四字节的长度信息。
ByteBuffer buffer = ByteBuffer.allocate(4);
buffer.clear();
for (; buffer.position() != 4;) {
int read = socketChannel.read(buffer);
if (read == -1) {
key.cancel();
break;
}
}
DataInputStream dataInputStream = new DataInputStream(
new ByteArrayInputStream(buffer.array()));
int readInt = dataInputStream.readInt();
buffer = ByteBuffer.allocate(readInt);
ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
buffer.clear();
for (; buffer.position() != readInt;) {
socketChannel.read(buffer);
}
byteArrayOutputStream.write(buffer.array());
InputStream inputStream = new ByteArrayInputStream(byteArrayOutputStream.toByteArray());
Codec cc = new Codec(inputStream, new ByteArrayOutputStream());
服务端逻辑,编码器不再是socket的输入输出流了,因为阻塞式的socket并不能直接处理非阻塞的数据,需要在线程内读取完毕后,通过转换成阻塞式的io操作写入。
模拟了100个客户端去请求阻塞和非阻塞服务端,每个客户端请求3次后关闭,测试结果如下。
| 阻塞 | 时间 |
|---|---|
| 是 | 1024 |
| 否 | 359 |