• Netty深入浅出Java网络编程学习笔记(三) 优化篇


    目录

    五、优化

    1、拓展序列化算法

    序列化接口

    枚举实现类

    修改原编解码器

    2、参数调优

    CONNECT_TIMEOUT_MILLIS

    使用

    源码分析

    SO_BACKLOG

    三次握手与连接队列

    作用

    默认值

    TCP_NODELAY

    SO_SNDBUF & SO_RCVBUF

    ALLOCATOR

    使用

    ByteBufAllocator类型

    RCVBUF_ALLOCATOR

    3、RPC框架

    准备工作

    RpcRequestMessageHandler

    RpcResponseMessageHandler

    客户端发送消息

    改进客户端

    改进RpcResponseMessageHandler


    五、优化

    1、拓展序列化算法

    序列化,反序列化主要用在消息正文的转换上

    • 序列化时,需要将 Java 对象变为要传输的数据(可以是 byte[],或 json 等,最终都需要变成 byte[])

    • 反序列化时,需要将传入的正文数据还原成 Java 对象,便于处理

    序列化接口

    1. public interface Serializer {
    2. /**
    3. * 序列化
    4. * @param object 被序列化的对象
    5. * @param 被序列化对象类型
    6. * @return 序列化后的字节数组
    7. */
    8. byte[] serialize(T object);
    9. /**
    10. * 反序列化
    11. * @param clazz 反序列化的目标类的Class对象
    12. * @param bytes 被反序列化的字节数组
    13. * @param 反序列化目标类
    14. * @return 反序列化后的对象
    15. */
    16. T deserialize(Class clazz, byte[] bytes);
    17. }

    枚举实现类

    积累一下这种枚举的运用方式

    1. public enum SerializerAlgorithm implements Serializer {
    2. // Java的序列化和反序列化
    3. Java {
    4. @Override
    5. public byte[] serialize(T object) {
    6. // 序列化后的字节数组
    7. byte[] bytes = null;
    8. try (ByteArrayOutputStream bos = new ByteArrayOutputStream();
    9. ObjectOutputStream oos = new ObjectOutputStream(bos)) {
    10. oos.writeObject(object);
    11. bytes = bos.toByteArray();
    12. } catch (IOException e) {
    13. e.printStackTrace();
    14. }
    15. return bytes;
    16. }
    17. @Override
    18. public T deserialize(Class clazz, byte[] bytes) {
    19. T target = null;
    20. System.out.println(Arrays.toString(bytes));
    21. try (ByteArrayInputStream bis = new ByteArrayInputStream(bytes);
    22. ObjectInputStream ois = new ObjectInputStream(bis)) {
    23. target = (T) ois.readObject();
    24. } catch (IOException | ClassNotFoundException e) {
    25. e.printStackTrace();
    26. }
    27. // 返回反序列化后的对象
    28. return target;
    29. }
    30. }
    31. // Json的序列化和反序列化
    32. Json {
    33. @Override
    34. public byte[] serialize(T object) {
    35. String s = new Gson().toJson(object);
    36. System.out.println(s);
    37. // 指定字符集,获得字节数组
    38. return s.getBytes(StandardCharsets.UTF_8);
    39. }
    40. @Override
    41. public T deserialize(Class clazz, byte[] bytes) {
    42. String s = new String(bytes, StandardCharsets.UTF_8);
    43. System.out.println(s);
    44. // 此处的clazz为具体类型的Class对象,而不是父类Message的
    45. return new Gson().fromJson(s, clazz);
    46. }
    47. }
    48. }

    修改原编解码器

    编码

    1. // 获得序列化后的msg
    2. /* 使用指定的序列化方式 SerializerAlgorithm.values();是获取一个枚举类中
    3. 的对象下标,如上的枚举中,Java对象的获取可以为SerializerAlgorithm.values()[0];
    4. */
    5. SerializerAlgorithm[] values = SerializerAlgorithm.values();
    6. // 获得序列化后的对象
    7. byte[] bytes = values[out.getByte(5)-1].serialize(msg);

    解码

    1. // 获得反序列化方式
    2. SerializerAlgorithm[] values = SerializerAlgorithm.values();
    3. // 通过指定方式进行反序列化
    4. // 需要通过Message的方法获得具体的消息类型
    5. Message message = values[seqType-1].deserialize(Message.getMessageClass(messageType), bytes);

    2、参数调优

    CONNECT_TIMEOUT_MILLIS

    • 属于 SocketChannal 的参数
    • 用在客户端建立连接时,如果在指定毫秒内无法连接,会抛出 timeout 异常
    • 注意:Netty 中不要用成了SO_TIMEOUT 主要用在阻塞 IO,而 Netty 是非阻塞 IO
    使用
    1. public class TestParam {
    2. public static void main(String[] args) {
    3. // SocketChannel 5s内未建立连接就抛出异常
    4. new Bootstrap().option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 5000);
    5. // ServerSocketChannel 5s内未建立连接就抛出异常
    6. new ServerBootstrap().option(ChannelOption.CONNECT_TIMEOUT_MILLIS,5000);
    7. // SocketChannel 5s内未建立连接就抛出异常
    8. new ServerBootstrap().childOption(ChannelOption.CONNECT_TIMEOUT_MILLIS, 5000);
    9. }
    10. }

    注意SocketChannel  和 ServerSocketChannel 的区别

    • 客户端通过 Bootstrap.option 函数来配置参数,配置参数作用于 SocketChannel
    • 服务器通过 ServerBootstrap来配置参数,但是对于不同的 Channel 需要选择不同的方法
      • 通过 option 来配置 ServerSocketChannel 上的参数
      • 通过 childOption 来配置 SocketChannel 上的参数

    源码分析

    客户端中连接服务器的线程是 NIO 线程,抛出异常的是主线程。这是如何做到超时判断以及线程通信的呢

    AbstractNioChannel.AbstractNioUnsafe.connect方法中

    1. public final void connect(
    2. final SocketAddress remoteAddress, final SocketAddress localAddress, final ChannelPromise promise) {
    3. ...
    4. // Schedule connect timeout.
    5. // 设置超时时间,通过option方法传入的CONNECT_TIMEOUT_MILLIS参数进行设置
    6. int connectTimeoutMillis = config().getConnectTimeoutMillis();
    7. // 如果超时时间大于0
    8. if (connectTimeoutMillis > 0) {
    9. // 创建一个定时任务,延时connectTimeoutMillis(设置的超时时间时间)后执行
    10. // schedule(Runnable command, long delay, TimeUnit unit)
    11. connectTimeoutFuture = eventLoop().schedule(new Runnable() {
    12. @Override
    13. public void run() {
    14. // 判断是否建立连接,Promise进行NIO线程与主线程之间的通信
    15. // 如果超时,则通过tryFailure方法将异常放入Promise中
    16. // 在主线程中抛出
    17. ChannelPromise connectPromise = AbstractNioChannel.this.connectPromise;
    18. ConnectTimeoutException cause = new ConnectTimeoutException("connection timed out: " + remoteAddress);
    19. if (connectPromise != null && connectPromise.tryFailure(cause)) {
    20. close(voidPromise());
    21. }
    22. }
    23. }, connectTimeoutMillis, TimeUnit.MILLISECONDS);
    24. }
    25. ...
    26. }

    超时的判断主要是通过 Eventloop 的 schedule 方法 + Promise 共同实现的

    • schedule 设置了一个定时任务,延迟connectTimeoutMillis秒后执行该方法
    • 如果指定时间内没有建立连接,则会执行其中的任务
      • 任务负责创建 ConnectTimeoutException 异常,并将异常通过 Pormise 传给主线程并抛

    SO_BACKLOG

    该参数是 ServerSocketChannel 的参数

    三次握手与连接队列

    第一次握手时,因为客户端与服务器之间的连接还未完全建立,连接会被放入半连接队列

    当完成三次握手以后,连接会被放入全连接队列中

    服务器处理Accept事件是在TCP三次握手,也就是建立连接之后。服务器会从全连接队列中获取连接并进行处理

    在 linux 2.2 之前,backlog 大小包括了两个队列的大小,在 linux 2.2 之后,分别用下面两个参数来控制

    • 半连接队列 - sync queue
      • 大小通过 /proc/sys/net/ipv4/tcp_max_syn_backlog 指定,在 syncookies 启用的情况下,逻辑上没有最大值限制,这个设置便被忽略
    • 全连接队列 - accept queue
      • 其大小通过 /proc/sys/net/core/somaxconn 指定,在使用 listen 函数时,内核会根据传入的 backlog 参数与系统参数,取二者的较小值
      • 如果 accpet queue 队列满了,server 将发送一个拒绝连接的错误信息到 client
    作用

    在Netty中,SO_BACKLOG主要用于设置全连接队列的大小。当处理Accept的速率小于连接建立的速率时,全连接队列中堆积的连接数大于SO_BACKLOG设置的值时,便会抛出异常

    设置方式如下

    1. // 设置全连接队列,大小为2
    2. new ServerBootstrap().option(ChannelOption.SO_BACKLOG, 2);
    默认值

    backlog参数在NioSocketChannel.doBind方法被使用

    1. @Override
    2. protected void doBind(SocketAddress localAddress) throws Exception {
    3. if (PlatformDependent.javaVersion() >= 7) {
    4. javaChannel().bind(localAddress, config.getBacklog());
    5. } else {
    6. javaChannel().socket().bind(localAddress, config.getBacklog());
    7. }
    8. }

    其中backlog被保存在了DefaultServerSocketChannelConfig配置类中

    private volatile int backlog = NetUtil.SOMAXCONN;

    具体的赋值操作如下

    1. SOMAXCONN = AccessController.doPrivileged(new PrivilegedAction() {
    2. @Override
    3. public Integer run() {
    4. // Determine the default somaxconn (server socket backlog) value of the platform.
    5. // The known defaults:
    6. // - Windows NT Server 4.0+: 200
    7. // - Linux and Mac OS X: 128
    8. int somaxconn = PlatformDependent.isWindows() ? 200 : 128;
    9. File file = new File("/proc/sys/net/core/somaxconn");
    10. BufferedReader in = null;
    11. try {
    12. // file.exists() may throw a SecurityException if a SecurityManager is used, so execute it in the
    13. // try / catch block.
    14. // See https://github.com/netty/netty/issues/4936
    15. if (file.exists()) {
    16. in = new BufferedReader(new FileReader(file));
    17. // 将somaxconn设置为Linux配置文件中设置的值
    18. somaxconn = Integer.parseInt(in.readLine());
    19. if (logger.isDebugEnabled()) {
    20. logger.debug("{}: {}", file, somaxconn);
    21. }
    22. } else {
    23. ...
    24. }
    25. ...
    26. }
    27. // 返回backlog的值
    28. return somaxconn;
    29. }
    30. }
    • backlog的值会根据操作系统的不同,来选择不同的默认值
      • Windows 200
      • Linux/Mac OS 128
    • 如果配置文件/proc/sys/net/core/somaxconn存在,会读取配置文件中的值,并将backlog的值设置为配置文件中指定的

    TCP_NODELAY

    • 属于 SocketChannal 参数
    • 因为 Nagle 算法,数据包会堆积到一定的数量后一起发送,这就可能导致数据的发送存在一定的延时
    • 该参数默认为false,如果不希望的发送被延时,则需要将该值设置为true

    SO_SNDBUF & SO_RCVBUF

    • SO_SNDBUF 属于 SocketChannal 参数
    • SO_RCVBUF 既可用于 SocketChannal 参数,也可以用于 ServerSocketChannal 参数(建议设置到 ServerSocketChannal 上)
    • 该参数用于指定接收方与发送方的滑动窗口大小

    ALLOCATOR

    • 属于 SocketChannal 参数
    • 用来配置 ByteBuf 是池化还是非池化,是直接内存还是堆内存
    使用
    1. // 选择ALLOCATOR参数,设置SocketChannel中分配的ByteBuf类型
    2. // 第二个参数需要传入一个ByteBufAllocator,用于指定生成的 ByteBuf 的类型
    3. new ServerBootstrap().childOption(ChannelOption.ALLOCATOR, new PooledByteBufAllocator());
    ByteBufAllocator类型
    • 池化并使用直接内存

      1. // true表示使用直接内存
      2. new PooledByteBufAllocator(true);
    • 池化并使用堆内存

      1. // false表示使用堆内存
      2. new PooledByteBufAllocator(false);
    • 非池化并使用直接内存

      1. // ture表示使用直接内存
      2. new UnpooledByteBufAllocator(true);
    • 非池化并使用堆内存

      1. // false表示使用堆内存
      2. new UnpooledByteBufAllocator(false);

    RCVBUF_ALLOCATOR

    • 属于 SocketChannal 参数
    • 控制 Netty 接收缓冲区大小
    • 负责入站数据的分配,决定入站缓冲区的大小(并可动态调整),统一采用 direct 直接内存,具体池化还是非池化由 allocator 决定
    1. 在 Netty 中,RCVBUF_ALLOCATOR 参数的作用是设置接收缓冲区分配
    2. 器。该参数决定了网络通道接收数据时的缓冲区分配策略。
    3. Netty 为了优化网络通信性能,使用了可扩展的缓冲区分配策略来处理
    4. 接收到的数据。而 RCVBUF_ALLOCATOR 参数就是用于配置这种缓冲区分配策略的。

    3、RPC框架

    准备工作

    在聊天室代码的基础上进行一定的改进

    Message中添加如下代码

    1. public abstract class Message implements Serializable {
    2. ...
    3. // 添加RPC消息类型
    4. public static final int RPC_MESSAGE_TYPE_REQUEST = 101;
    5. public static final int RPC_MESSAGE_TYPE_RESPONSE = 102;
    6. static {
    7. // 将消息类型放入消息类对象Map中
    8. messageClasses.put(RPC_MESSAGE_TYPE_REQUEST, RpcRequestMessage.class);
    9. messageClasses.put(RPC_MESSAGE_TYPE_RESPONSE, RpcResponseMessage.class);
    10. }
    11. }

    RPC请求消息

    1. @Data
    2. @AllControductor
    3. public class RpcRequestMessage extends Message {
    4. /**
    5. * 调用的接口全限定名,服务端根据它找到实现
    6. */
    7. private String interfaceName;
    8. /**
    9. * 调用接口中的方法名
    10. */
    11. private String methodName;
    12. /**
    13. * 方法返回类型
    14. */
    15. private Class returnType;
    16. /**
    17. * 方法参数类型数组
    18. */
    19. private Class[] parameterTypes;
    20. /**
    21. * 方法参数值数组
    22. */
    23. private Object[] parameterValue;
    24. }

    想要远程调用一个方法,必须知道以下五个信息

    • 方法所在的全限定类名
    • 方法名
    • 方法返回值类型
    • 方法参数类型
    • 方法参数值

    RPC响应消息

    1. @Data
    2. public class RpcResponseMessage extends Message {
    3. /**
    4. * 返回值
    5. */
    6. private Object returnValue;
    7. /**
    8. * 异常值
    9. */
    10. private Exception exceptionValue;
    11. }

    响应消息中只需要获取返回结果和异常值

    服务器

    1. public class RPCServer {
    2. public static void main(String[] args) {
    3. NioEventLoopGroup boss = new NioEventLoopGroup();
    4. NioEventLoopGroup worker = new NioEventLoopGroup();
    5. LoggingHandler loggingHandler = new LoggingHandler(LogLevel.DEBUG);
    6. MessageSharableCodec messageSharableCodec = new MessageSharableCodec();
    7. // PRC 请求消息处理器
    8. RpcRequestMessageHandler rpcRequestMessageHandler = new RpcRequestMessageHandler();
    9. try {
    10. ServerBootstrap serverBootstrap = new ServerBootstrap();
    11. serverBootstrap.channel(NioServerSocketChannel.class);
    12. serverBootstrap.group(boss, worker);
    13. serverBootstrap.childHandler(new ChannelInitializer() {
    14. @Override
    15. protected void initChannel(SocketChannel ch) throws Exception {
    16. // 自定义协议粘半包处理器
    17. ch.pipeline().addLast(new ProtocolFrameDecoder());
    18. // 日志
    19. ch.pipeline().addLast(loggingHandler);
    20. // 协议编解码器
    21. ch.pipeline().addLast(messageSharableCodec);
    22. // rpc处理工人
    23. ch.pipeline().addLast(rpcRequestMessageHandler);
    24. }
    25. });
    26. Channel channel = serverBootstrap.bind(8080).sync().channel();
    27. channel.closeFuture().sync();
    28. } catch (InterruptedException e) {
    29. e.printStackTrace();
    30. } finally {
    31. boss.shutdownGracefully();
    32. worker.shutdownGracefully();
    33. }
    34. }
    35. }

    服务器中添加了处理RPCRequest消息的handler

    客户端

    1. public class RPCClient {
    2. public static void main(String[] args) {
    3. NioEventLoopGroup group = new NioEventLoopGroup();
    4. LoggingHandler loggingHandler = new LoggingHandler(LogLevel.DEBUG);
    5. MessageSharableCodec messageSharableCodec = new MessageSharableCodec();
    6. // PRC 请求消息处理器
    7. RpcResponseMessageHandler rpcResponseMessageHandler = new RpcResponseMessageHandler();
    8. try {
    9. Bootstrap bootstrap = new Bootstrap();
    10. bootstrap.channel(NioSocketChannel.class);
    11. bootstrap.group(group);
    12. bootstrap.handler(new ChannelInitializer() {
    13. @Override
    14. protected void initChannel(SocketChannel ch) throws Exception {
    15. ch.pipeline().addLast(new ProtocolFrameDecoder());
    16. ch.pipeline().addLast(loggingHandler);
    17. ch.pipeline().addLast(messageSharableCodec);
    18. // rpc处理工人
    19. ch.pipeline().addLast(rpcResponseMessageHandler);
    20. }
    21. });
    22. Channel channel = bootstrap.connect(new InetSocketAddress("localhost", 8080)).sync().channel();
    23. channel.closeFuture().sync();
    24. } catch (InterruptedException e) {
    25. e.printStackTrace();
    26. } finally {
    27. group.shutdownGracefully();
    28. }
    29. }
    30. }

    通过接口Class获取实例对象的Factory

    1. public class ServicesFactory {
    2. static HashMap, Object> map = new HashMap<>(16);
    3. public static Object getInstance(Class interfaceClass) throws ClassNotFoundException, IllegalAccessException, InstantiationException {
    4. // 根据Class创建实例
    5. try {
    6. Class clazz = Class.forName("cn.nyimac.study.day8.server.service.HelloService");
    7. Object instance = Class.forName("cn.nyimac.study.day8.server.service.HelloServiceImpl").newInstance();
    8. // 放入 InterfaceClass -> InstanceObject 的映射
    9. map.put(clazz, instance);
    10. } catch (ClassNotFoundException | InstantiationException | IllegalAccessException e) {
    11. e.printStackTrace();
    12. }
    13. return map.get(interfaceClass);
    14. }
    15. }

    RpcRequestMessageHandler

    1. @ChannelHandler.Sharable
    2. public class RpcRequestMessageHandler extends SimpleChannelInboundHandler {
    3. @Override
    4. protected void channelRead0(ChannelHandlerContext ctx, RpcRequestMessage rpcMessage) {
    5. RpcResponseMessage rpcResponseMessage = new RpcResponseMessage();
    6. try {
    7. // 设置返回值的属性
    8. rpcResponseMessage.setSequenceId(rpcMessage.getSequenceId());
    9. // 返回一个实例
    10. HelloService service = (HelloService) ServicesFactory.getInstance(Class.forName(rpcMessage.getInterfaceName()));
    11. // 通过反射调用方法,并获取返回值
    12. Method method = service.getClass().getMethod(rpcMessage.getMethodName(), rpcMessage.getParameterTypes());
    13. // 获得返回值
    14. Object invoke = method.invoke(service, rpcMessage.getParameterValue());
    15. // 设置返回值
    16. rpcResponseMessage.setReturnValue(invoke);
    17. } catch (Exception e) {
    18. e.printStackTrace();
    19. // 设置异常
    20. rpcResponseMessage.setExceptionValue(e);
    21. }
    22. }
    23. // 向channel中写入Message
    24. ctx.writeAndFlush(rpcResponseMessage);
    25. }

    远程调用方法主要是通过反射实现的,大致步骤如下

    • 通过请求消息传入被调入方法的各个参数
    • 通过全限定接口名,在map中查询到对应的类并实例化对象
    • 通过反射获取Method,将请求消息的参数传入并调用其invoke方法,返回值放入响应消息中
    • 若有异常需要捕获,并放入响应消息中

    RpcResponseMessageHandler

    1. @ChannelHandler.Sharable
    2. public class RpcResponseMessageHandler extends SimpleChannelInboundHandler {
    3. static final Logger log = LoggerFactory.getLogger(ChatServer.class);
    4. @Override
    5. protected void channelRead0(ChannelHandlerContext ctx, RpcResponseMessage msg) throws Exception {
    6. log.debug("{}", msg);
    7. System.out.println((String)msg.getReturnValue());
    8. }
    9. }

    客户端发送消息

    1. public class RPCClient {
    2. public static void main(String[] args) {
    3. ...
    4. // 创建请求并发送
    5. RpcRequestMessage message = new RpcRequestMessage(1,
    6. "cn.nyimac.study.day8.server.service.HelloService",
    7. "sayHello",
    8. String.class,
    9. new Class[]{String.class},
    10. new Object[]{"Nyima"});
    11. channel.writeAndFlush(message);
    12. ...
    13. }
    14. }

    运行结果

    客户端

    1606 [nioEventLoopGroup-2-1] DEBUG cn.nyimac.study.day8.server.ChatServer  - RpcResponseMessage{returnValue=你好,Nyima, exceptionValue=null}

    改进客户端

    1. public class RPCClientManager {
    2. /**
    3. * 产生SequenceId
    4. */
    5. private static AtomicInteger sequenceId = new AtomicInteger(0);
    6. private static volatile Channel channel = null;
    7. private static final Object lock = new Object();
    8. public static void main(String[] args) {
    9. // 创建代理对象
    10. HelloService service = (HelloService) getProxy(HelloService.class);
    11. // 通过代理对象执行方法
    12. System.out.println(service.sayHello("Nyima"));
    13. System.out.println(service.sayHello("Hulu"));
    14. }
    15. /**
    16. * 单例模式创建Channel
    17. */
    18. public static Channel getChannel() {
    19. if (channel == null) {
    20. synchronized (lock) {
    21. if (channel == null) {
    22. init();
    23. }
    24. }
    25. }
    26. return channel;
    27. }
    28. /**
    29. * 使用代理模式,帮助我们创建请求消息并发送
    30. */
    31. public static Object getProxy(Class serviceClass) {
    32. Class[] classes = new Class[]{serviceClass};
    33. // 使用JDK代理,创建代理对象
    34. Object o = Proxy.newProxyInstance(serviceClass.getClassLoader(), classes, new InvocationHandler() {
    35. @Override
    36. public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
    37. // 创建请求消息
    38. int id = sequenceId.getAndIncrement();
    39. RpcRequestMessage message = new RpcRequestMessage(id, serviceClass.getName(),
    40. method.getName(), method.getReturnType(),
    41. method.getParameterTypes(),
    42. args);
    43. // 发送消息
    44. getChannel().writeAndFlush(message);
    45. // 创建Promise,用于获取NIO线程中的返回结果,获取的过程是异步的
    46. DefaultPromise promise = new DefaultPromise<>(getChannel().eventLoop());
    47. // 将Promise放入Map中
    48. RpcResponseMessageHandler.promiseMap.put(id, promise);
    49. // 等待被放入Promise中结果
    50. promise.await();
    51. if (promise.isSuccess()) {
    52. // 调用方法成功,返回方法执行结果
    53. return promise.getNow();
    54. } else {
    55. // 调用方法失败,抛出异常
    56. throw new RuntimeException(promise.cause());
    57. }
    58. }
    59. });
    60. return o;
    61. }
    62. private static void init() {
    63. NioEventLoopGroup group = new NioEventLoopGroup();
    64. LoggingHandler loggingHandler = new LoggingHandler(LogLevel.DEBUG);
    65. MessageSharableCodec messageSharableCodec = new MessageSharableCodec();
    66. // PRC 请求消息处理器
    67. RpcResponseMessageHandler rpcResponseMessageHandler = new RpcResponseMessageHandler();
    68. Bootstrap bootstrap = new Bootstrap();
    69. bootstrap.channel(NioSocketChannel.class);
    70. bootstrap.group(group);
    71. bootstrap.handler(new ChannelInitializer() {
    72. @Override
    73. protected void initChannel(SocketChannel ch) throws Exception {
    74. ch.pipeline().addLast(new ProtocolFrameDecoder());
    75. ch.pipeline().addLast(loggingHandler);
    76. ch.pipeline().addLast(messageSharableCodec);
    77. ch.pipeline().addLast(rpcResponseMessageHandler);
    78. }
    79. });
    80. try {
    81. channel = bootstrap.connect(new InetSocketAddress("localhost", 8080)).sync().channel();
    82. // 异步关闭 group,避免Channel被阻塞
    83. channel.closeFuture().addListener(future -> {
    84. group.shutdownGracefully();
    85. });
    86. } catch (InterruptedException e) {
    87. e.printStackTrace();
    88. }
    89. }
    90. }
    91. 获得Channel

      • 建立连接,获取Channel的操作被封装到了init方法中,当连接断开时,通过addListener法异步关闭group

      • 通过单例模式创建与获取Channel

      远程调用方法

      • 为了让方法的调用变得简洁明了,将RpcRequestMessage创建与发送过程通过JDK的动态代理来完成
      • 通过返回的代理对象调用方法即可,方法参数为被调用方法接口的Class类

      远程调用方法返回值获取

      • 调用方法的是主线程,处理返回结果的是NIO线程(RpcResponseMessageHandler)。要在不同线程中进行返回值的传递,需要用到Promise

      • RpcResponseMessageHandler中创建一个Map

        • Key为SequenceId
        • Value为对应的Promise
      • 主线程的代理类将RpcResponseMessage发送给服务器后,需要创建Promise对象,并将其放入到RpcResponseMessageHandler的Map中。需要使用await等待结果被放入Promise中。获取结果后,根据结果类型(判断是否成功)来返回结果或抛出异常

      1. // 创建Promise,用于获取NIO线程中的返回结果,获取的过程是异步的
      2. DefaultPromise promise = new DefaultPromise<>(getChannel().eventLoop());
      3. // 将Promise放入Map中
      4. RpcResponseMessageHandler.promiseMap.put(id, promise);
      5. // 等待被放入Promise中结果
      6. promise.await();
      7. if (promise.isSuccess()) {
      8. // 调用方法成功,返回方法执行结果
      9. return promise.getNow();
      10. } else {
      11. // 调用方法失败,抛出异常
      12. throw new RuntimeException(promise.cause());
      13. }
      14. NIO线程负责通过SequenceId获取并移除(remove)对应的Promise,然后根据RpcResponseMessage中的结果,向Promise中放入不同的值

        • 如果没有异常信息(ExceptionValue),就调用promise.setSuccess(returnValue)放入方法返回值
        • 如果有异常信息,就调用promise.setFailure(exception)放入异常信息
        1. // 将返回结果放入对应的Promise中,并移除Map中的Promise
        2. Promise promise = promiseMap.remove(msg.getSequenceId());
        3. Object returnValue = msg.getReturnValue();
        4. Exception exception = msg.getExceptionValue();
        5. if (promise != null) {
        6. if (exception != null) {
        7. // 返回结果中有异常信息
        8. promise.setFailure(exception);
        9. } else {
        10. // 方法正常执行,没有异常
        11. promise.setSuccess(returnValue);
        12. }
        13. }
        14. 改进RpcResponseMessageHandler

          1. @ChannelHandler.Sharable
          2. public class RpcResponseMessageHandler extends SimpleChannelInboundHandler {
          3. static final Logger log = LoggerFactory.getLogger(ChatServer.class);
          4. /**
          5. * 用于存放Promise的集合,Promise用于主线程与NIO线程之间传递返回值
          6. */
          7. public static Map> promiseMap = new ConcurrentHashMap<>(16);
          8. @Override
          9. protected void channelRead0(ChannelHandlerContext ctx, RpcResponseMessage msg) throws Exception {
          10. // 将返回结果放入对应的Promise中,并移除Map中的Promise
          11. Promise promise = promiseMap.remove(msg.getSequenceId());
          12. Object returnValue = msg.getReturnValue();
          13. Exception exception = msg.getExceptionValue();
          14. if (promise != null) {
          15. if (exception != null) {
          16. // 返回结果中有异常信息
          17. promise.setFailure(exception);
          18. } else {
          19. // 方法正常执行,没有异常
          20. promise.setSuccess(returnValue);
          21. }
          22. }
          23. // 拿到返回结果并打印
          24. log.debug("{}", msg);
          25. }
          26. }
          27. 相关阅读:
            猫狗肠道菌群—“主子们”的健康新领域
            SpringBoot-异常处理
            PostgreSQL11.17离线安装过程(X86+Ubuntu)
            B/S医院HIS绩效考核系统源码
            Java循环对比:传统for循环、增强型for循环和forEach循环
            Python面试题:如何在 Python 中实现一个简单的 Web 服务器?
            c++ 学习 之 const,constexpr,volatile
            【vue组件】使用element-ui table 实现嵌套表格 点击展开时获取数据
            什么是阿里云轻量应用服务器?
            日本it就职培训机构,日本IT行业的三种类型
          28. 原文地址:https://blog.csdn.net/weixin_73077810/article/details/133774863