• java实现朴素rpc


    网络五层协议中,试问 RPC 在第几层?

    五层协议
    应用层
    传输层
    网络层
    链路层
    物理层

    进程协作完成任务时,通常需要相互通信。

    本质上通信的方式分为本地和网络。

    本地通信方式主要有两种形式,第一种是如命名管道基于操作系统暴漏的读写来操作,第二种是如锁机制的基于共享内存的线程队列阻塞机制(内存屏障)。

    文章将要介绍一种网络通信方式,实现多台主机上进程相互通信的方式,是计算机网络的应用,是分布式的基础。

    接口定义

    rpc是远程过程调用,通过存根来进行数据定义,通过网络主机和端口建立传输层的连接和传。

    首先先来看下存根,这个非常好理解,因为他其实就是空实现的本地方法调用。

    public class EchoService {
    
    	public static EchoRsp EchoSvr(EchoReq req) {
    		return new EchoRsp("echo:" + req.content);
    	}
    
    	public static void main(String[] args) {
    		System.out.println(EchoService.EchoSvr(new EchoReq("hello")).content);
    	}
    }
    
    class EchoReq {
    	String content;
    
    	public EchoReq(String content) {
    		this.content = content;
    	}
    }
    
    class EchoRsp {
    	String content;
    
    	public EchoRsp(String content) {
    		this.content = content;
    	}
    }
    

    这是一个回声服务,将传入的数据简单处理后回传。

    public static EchoRsp EchoSvr(EchoReq req) throws Exception {
    	throw new UnsupportedOperationException();
    }
    

    存根相当于客户端并没有回声服务的实现,他只是提供了客户端需要关心的请求和结果。

    编码解码

    之前说到我们已经可以拿掉请求和结果了,接下来需要实现数据的序列化方式,用于数据在流上的传输。

    class EchoReq implements Serializable {
    	String content;
    
    	public EchoReq(String content) {
    		this.content = content;
    	}
    }
    
    class EchoRsp implements Serializable {
    	String content;
    
    	public EchoRsp(String content) {
    		this.content = content;
    	}
    }
    

    序列化方式使用了Serializable,是语言默认的一种序列化的方式,使用十分简单,只需要实现序列化的接口即可。

    定义好序列化方式,还需要定义那些数据在流上传输。

    public class Header implements Serializable {
    	String stub;
    	String method;
    
    	public Header(String stub, String method) {
    		this.stub = stub;
    		this.method = method;
    	}
    }
    

    Header 用于制定存根和调用的方法,因此需要在流上传输。除了header外,还应该有服务的请求requset和服务的响应response

    public class Codec {
    	InputStream inputStream;
    	OutputStream outputStream;
    
    	public Codec(InputStream inputStream, OutputStream outputStream) {
    		this.inputStream = inputStream;
    		this.outputStream = outputStream;
    	}
    
    	public Header Header() throws Exception {
    		return (Header) new ObjectInputStream(inputStream).readObject();
    	}
    
    	public Object Read() throws Exception {
    		return new ObjectInputStream(inputStream).readObject();
    	}
    
    	public void Write(Header header, Object obj) throws Exception {
    		new ObjectOutputStream(outputStream).writeObject(header);
    		new ObjectOutputStream(outputStream).writeObject(obj);
    	}
    }
    

    定义好传输哪些数据后,编码解码器就好实现了。

    服务端

    rpc 中服务端承载了监听本机端口,并与客户端建立连接并通信数据。

    ServerSocket serverSocket = new ServerSocket(8080);
    Socket socket = serverSocket.accept();
    

    通过 socket 方式,很容易监听本机的端口,没有客户端连接时会阻塞到 accept

    Codec cc = new Codec(socket.getInputStream(), socket.getOutputStream());
    Header header = cc.Header();
    Class stub = Class.forName(header.stub);
    Map methods = Arrays.asList(stub.getDeclaredMethods()).stream()
    	.collect(Collectors.toMap(t -> t.getName(), t -> t));
    Method method = methods.get(header.method);
    cc.Write(header, method.invoke(null, header, cc.Read()));
    

    服务端通过编码器可以读出客户端要调用的存根和方法,在通过反射的方式可以加载对应的类并进行调用。

    现在已经实现了基本的服务端,但是这个服务端的吞吐量受制,需要对客户端的请求排队处理。

    ExecutorService iopool = Executors.newFixedThreadPool(100);
    Socket socket = serverSocket.accept();
    iopool.submit(() -> {
      // ...
    });
    

    对于网络请求这种密集io的应用,通常使用池化技术来处理,并发多个线程就可以成倍加快吞吐量。

    客户端

    大部分客户端都是支持水平扩容的,对客户端来说单机客户端池化加长连接几乎可以覆盖大部分场景。

    class Call {
    	Long seq;
    	Object req;
    	Object rsp;
    	Thread thread;
    
    	public Call(Long seq, Object req) {
    		this.seq = seq;
    		this.req = req;
    		this.thread = Thread.currentThread();
    	}
    }
    

    对调用抽象,每个调用需要绑定线程并阻塞,在服务端处理后,会通过标识符回调通知,唤醒阻塞线程。

    public class Client {
    	Long seq;
    	InputStream inputStream;
    	OutputStream outputStream;
    	ReentrantLock clock;
    	ReentrantLock metux;
    	Map calls;
    
    	public Client(InputStream inputStream, OutputStream outputStream) {
    		this.inputStream = inputStream;
    		this.outputStream = outputStream;
    		seq = 0L;
    		clock = new ReentrantLock();
    		metux = new ReentrantLock();
    		calls = new HashMap<>();
    		new Thread(() -> {
    			receive();
    		}).start();
    	}
    }
    

    客户端应该有标识,对共享资源有互斥锁,对流的读写也需要互斥执行。

    FutureTask send(Header header, Object req) {
    	Call call = new Call(seq, req);
    	try {
    		metux.lock();
    		final Call fcall = register(call);
    		header.seq = call.seq;
    		new ObjectOutputStream(outputStream).writeObject(header);
    		new ObjectOutputStream(outputStream).writeObject(req);
    		FutureTask task = new FutureTask<>(() -> {
    			fcall.thread = Thread.currentThread();
    			LockSupport.park();
    			return fcall.rsp;
    		});
    		task.run();
    		return task;
    	} catch (Exception e) {
    		e.printStackTrace();
    		return null;
    	} finally {
    		metux.unlock();
    	}
    }
    
    

    客户端可以通过发送方法,发送数据到服务端,发送数据后将发送的数据的线程阻塞。

    void receive() {
    	try {
    		for (;;) {
    			Header header = (Header) new ObjectInputStream(inputStream).readObject();
    			Call call = calls.remove(header.seq);
    			Object rsp = new ObjectInputStream(inputStream).readObject();
    			call.rsp = rsp;
    			LockSupport.unpark(call.thread);
    		}
    	} catch (Exception e) {
    		if (e instanceof SocketException) {
    			return;
    		}
    		e.printStackTrace();
    	}
    }
    

    每一个客户端都需要单独的线程去接受服务端回调,并唤醒发起请求时阻塞的线程。

    Call register(Call call) {
    	try {
    		clock.lock();
    		call.seq = seq;
    		calls.put(seq, call);
    		seq++;
    		return call;
    	} finally {
    		clock.unlock();
    	}
    }
    
    Call remove(Call call) {
    	try {
    		clock.lock();
    		call.seq = seq;
    		calls.remove(seq);
    		return call;
    	} finally {
    		clock.unlock();
    	}
    }
    

    calls是一段连续的共享内存,对其实现注册和移除的方法。

    多路复用

    线程增加后,同时也增加处理机切换上下文的成本,并不是每个时刻都会有活动的网络通信,对每个连接单独起一个线程管理确实有点浪费。

    ServerSocketChannel serverChannel = ServerSocketChannel.open();
    ServerSocket serverSocket = serverChannel.socket();
    serverSocket.bind(new InetSocketAddress(8080));
    serverChannel.configureBlocking(false);
    Selector selector = Selector.open();
    serverChannel.register(selector, SelectionKey.OP_ACCEPT);
    for (; selector.select(0) > 0;) {
    	Iterator it = selector.selectedKeys().iterator();
    	for (; it.hasNext();) {
    		SelectionKey key = it.next();
    		it.remove();
    		// ...
    	}
    }
    

    nio 的多路复用技术,允许只用一个线程,可以监听百万长连接。

    if (key.isAcceptable()) {
    	ServerSocketChannel channel = (ServerSocketChannel) key.channel();
    	SocketChannel sChannel = channel.accept();
    	sChannel.configureBlocking(false);
    	sChannel.register(selector, SelectionKey.OP_READ);
    }
    

    连接存在两种状态,其中之一就是有连接请求建立,建立连接后,应该把这连接重新放到多路复用池中。

    多路复用没有阻塞的读,这就要求我们必须考虑粘包类问题。

    可以先研究下是否可以根据数据本身来做定界符。

    [-84, -19, 0, 5, 115, 114, 0, 6, 72, 101, 97, 100, 101, 114, 4, 3, -46, 102, -12, -24, -29, -101, 2, 0, 3, 76, 0, 6, 109, 101, 116, 104, 111, 100, 116, 0, 18, 76, 106]
    [97, 118, 97, 47, 108, 97, 110, 103, 47, 83, 116, 114, 105, 110, 103, 59, 76, 0, 3, 115, 101, 113, 116, 0, 16, 76, 106, 97, 118, 97, 47, 108, 97, 110, 103, 47, 76]
    [111, 110, 103, 59, 76, 0, 4, 115, 116, 117, 98, 113, 0, 126, 0, 1, 120, 112, 116, 0, 4, 101, 99, 104, 111, 115, 114, 0, 14, 106, 97, 118, 97, 46, 108, 97, 110, 103]
    [46, 76, 111, 110, 103, 59, -117, -28, -112, -52, -113, 35, -33, 2, 0, 1, 74, 0, 5, 118, 97, 108, 117, 101, 120, 114, 0, 16, 106, 97, 118, 97, 46, 108, 97, 110, 103]
    [46, 78, 117, 109, 98, 101, 114, -122, -84, -107, 29, 11, -108, -32, -117, 2, 0, 0, 120, 112, 0, 0, 0, 0, 0, 0, 0, 0, 116, 0, 11, 69, 99, 104, 111, 83, 101, 114, 118]
    [105, 99, 101, -84, -19, 0, 5, 115, 114, 0, 11, 69, 99, 104, 111, 82, 101, 113, 117, 101, 115, 116, 124, -113, 104, 7, 112, -104, 46, -37, 2, 0, 1, 76, 0, 7, 99, 111]
    [110, 116, 101, 110, 116, 116, 0, 18, 76, 106, 97, 118, 97, 47, 108, 97, 110, 103, 47, 83, 116, 114, 105, 110, 103, 59, 120, 112, 116, 0, 6, 126, 104, 101, 108, 108]
    

    这里是序列化的数据,对其解读下。

    • [-84, -19]魔数,ObjectStreamConstants.STREAM_MAGIC。
    • [0, 5] 魔数,ObjectStreamConstants.STREAM_VERSION。
    • [115, 114] 魔数,ObjectStreamConstants.TC_CLASSDESC、ObjectStreamConstants.TC_OBJECT。
    • [0, 6] Header的长度。

    感兴趣可自己解读,结果是并没有很好的方式去定界。

    不难想到,几个避免粘包的方法。

    • 定界与转义:优点:侵入性比较小。缺点:需要转义原来的特殊字符。(TCP链路层、netty)
    • 消息体长度:优点:开始和结束非常明确。缺点:侵入性比较大。(netty)

    既然已经有侵入了,无所谓侵入大小,侵入的数据最好有明确的意义就更好。

    class NonblockOpt implements Consumer> {
    
    	@Override
    	public void accept(Map m) {
    		try {
    			Header header = (Header) m.get("header");
    			Object req = (Object) m.get("req");
    			OutputStream outputStream = (OutputStream) m.get("outputStream");
    			ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
    			new ObjectOutputStream(byteArrayOutputStream).writeObject(header);
    			new ObjectOutputStream(byteArrayOutputStream).writeObject(req);
    			new DataOutputStream(outputStream).writeInt(byteArrayOutputStream.toByteArray().length);
    		} catch (Exception e) {
    			e.printStackTrace();
    		}
    	}
    }
    
    FutureTask send(Header header, Object req, Consumer> opt) {
    	// ...
    	if (opt != null) {
    		HashMap m = new HashMap();
    		m.put("header", header);
    		m.put("req", req);
    		m.put("outputStream", outputStream);
    		opt.accept(m);
    	}
    	// ...
    }
    
    

    客户端改造,在写入真正的数据前,先写入四字节的长度信息。

    ByteBuffer buffer = ByteBuffer.allocate(4);
    buffer.clear();
    for (; buffer.position() != 4;) {
    	int read = socketChannel.read(buffer);
    	if (read == -1) {
    		key.cancel();
    		break;
    	}
    }
    
    DataInputStream dataInputStream = new DataInputStream(
    	new ByteArrayInputStream(buffer.array()));
    int readInt = dataInputStream.readInt();
    
    buffer = ByteBuffer.allocate(readInt);
    ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
    buffer.clear();
    for (; buffer.position() != readInt;) {
    	socketChannel.read(buffer);
    }
    byteArrayOutputStream.write(buffer.array());
    InputStream inputStream = new ByteArrayInputStream(byteArrayOutputStream.toByteArray());
    Codec cc = new Codec(inputStream, new ByteArrayOutputStream());
    

    服务端逻辑,编码器不再是socket的输入输出流了,因为阻塞式的socket并不能直接处理非阻塞的数据,需要在线程内读取完毕后,通过转换成阻塞式的io操作写入。

    模拟了100个客户端去请求阻塞和非阻塞服务端,每个客户端请求3次后关闭,测试结果如下。

    阻塞 时间
    1024
    359
  • 相关阅读:
    web学习---JavaScript---笔记(一)
    Zigbee 入网过程详解
    服务器监控软件(一 、大致讲解篇)
    YOLOv8改进 | 2023 | InnerIoU、InnerSIoU、InnerWIoU、FoucsIoU等损失函数
    C语言之指针
    原生php 实现redis登录五次被禁,隔天再登陆
    二叉排序树的创建与添加操作(思路分析)
    期货开户有什么规定
    STM32CubeMX实战教程(九)——外部SRAM+内存管理
    Linux(Centos7)服务器中配置Mysql主从数据库,以及数据库的安装,防火墙操作
  • 原文地址:https://www.cnblogs.com/aolob/p/java-rpc.html