• Java 网络编程之TCP(五):分析服务端注册OP_WRITE写数据的各种场景(一)


    在前面的文章中,我们分析了在NIO模式下,服务端接收和读取多个客户端数据的情况;

    本文,我们看下NIO模式下,使用Selector如何把数据发送给客户端。

    一、直接通过java.nio.channels.SocketChannel#write(java.nio.ByteBuffer)方法写

    二、注册OP_WRITE,在写状态满足情况下,通过java.nio.channels.SocketChannel#write(java.nio.ByteBuffer)方法写

    我们看下第一种情况:

    服务端在有数据需要发送给客户端的情况下,直接使用客户端对应的SocketChannel写数据即可,不考虑数据是否可以正常发送。

    这和BIO下,直接通过输出流写数据是一样的

    我们可以基于聊天室的功能,把收到的数据直接写给其他客户端

    客户端代码统一使用BIO,如下:

    1. import java.io.IOException;
    2. import java.io.InputStream;
    3. import java.io.OutputStream;
    4. import java.net.Socket;
    5. /**
    6. * 基于BIO的TCP网络通信的客户端,接收控制台输入的数据,然后通过字节流发送给服务端
    7. *
    8. */
    9. class ChatClient {
    10. public static void main(String[] args) throws IOException {
    11. // 连接server
    12. Socket serverSocket = new Socket("localhost", 9090);
    13. System.out.println("client connected to server");
    14. // 读取用户在控制台上的输入,并发送给服务器
    15. new Thread(new ClientThread(serverSocket)).start();
    16. // 接收服务端发送过来的数据
    17. try (InputStream serverSocketInputStream = serverSocket.getInputStream();) {
    18. byte[] buffer = new byte[1024];
    19. int len;
    20. while ((len = serverSocketInputStream.read(buffer)) != -1) {
    21. String data = new String(buffer, 0, len);
    22. System.out.println(
    23. "client receive data from server" + serverSocketInputStream + " data size:" + len + ": " + data);
    24. }
    25. }
    26. }
    27. }
    28. class ClientThread implements Runnable {
    29. private Socket serverSocket;
    30. public ClientThread(Socket serverSocket) {
    31. this.serverSocket = serverSocket;
    32. }
    33. @Override
    34. public void run() {
    35. // 读取用户在控制台上的输入,并发送给服务器
    36. InputStream in = System.in;
    37. byte[] buffer = new byte[1024];
    38. int len;
    39. try (OutputStream outputStream = serverSocket.getOutputStream();) {
    40. // read操作阻塞,直到有数据可读,由于后面还要接收服务端转发过来的数据,这两个操作都是阻塞的,所以需要两个线程
    41. while ((len = in.read(buffer)) != -1) {
    42. String data = new String(buffer, 0, len);
    43. System.out.println("client receive data from console" + in + " : " + new String(buffer, 0, len));
    44. if ("exit\n".equals(data)) {
    45. // 模拟客户端关闭连接
    46. System.out.println("client close :" + serverSocket);
    47. // 这里跳出循环后,try-with-resources 会自动关闭outputStream
    48. break;
    49. }
    50. // 发送数据给服务器端
    51. outputStream.write(new String(buffer, 0, len).getBytes()); // 此时buffer中是有换行符
    52. }
    53. } catch (IOException e) {
    54. throw new RuntimeException(e);
    55. }
    56. }
    57. }

    服务端代码如下:

    1. import java.io.IOException;
    2. import java.net.InetSocketAddress;
    3. import java.nio.ByteBuffer;
    4. import java.nio.channels.SelectableChannel;
    5. import java.nio.channels.SelectionKey;
    6. import java.nio.channels.Selector;
    7. import java.nio.channels.ServerSocketChannel;
    8. import java.nio.channels.SocketChannel;
    9. import java.util.Iterator;
    10. import java.util.Set;
    11. /**
    12. * 基于NIO实现服务端,通过Selector基于事件驱动客户端的读取
    13. * 服务端接收到数据后,直接写回客户端,或者写给其他客户端
    14. *
    15. */
    16. class NIOSelectorDirectWriteServer {
    17. Selector selector;
    18. public static void main(String[] args) throws IOException {
    19. NIOSelectorDirectWriteServer server = new NIOSelectorDirectWriteServer();
    20. server.start(); // 开启监听和事件处理
    21. }
    22. public void start() {
    23. initServer();
    24. // selector非阻塞轮询有哪些感兴趣的事件到了
    25. doService();
    26. }
    27. private void doService() {
    28. if (selector == null) {
    29. System.out.println("server init failed, without doing read/write");
    30. return;
    31. }
    32. try {
    33. while (true) {
    34. while (selector.select() > 0) {
    35. Set keys = selector.selectedKeys(); // 感兴趣且准备好的事件
    36. Iterator iterator = keys.iterator(); // 迭代器遍历处理,后面要删除集合元素
    37. while (iterator.hasNext()) {
    38. SelectionKey key = iterator.next();
    39. iterator.remove(); // 删除当前元素,防止重复处理
    40. // 下面根据事件进行分别处理
    41. if (key.isAcceptable()) {
    42. // 客户端连接事件
    43. acceptHandler(key);
    44. } else if (key.isReadable()) {
    45. // 读取客户端数据
    46. readHandler(key);
    47. }
    48. }
    49. }
    50. }
    51. } catch (IOException exception) {
    52. exception.printStackTrace();
    53. }
    54. }
    55. private void initServer() {
    56. try {
    57. ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
    58. serverSocketChannel.configureBlocking(false);
    59. serverSocketChannel.bind(new InetSocketAddress(9090));
    60. // 此时在selector上注册感兴趣的事件
    61. // 这里先注册OP_ACCEPT: 客户端连接事件
    62. selector = Selector.open();
    63. serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
    64. System.out.println("server init success");
    65. } catch (IOException exception) {
    66. exception.printStackTrace();
    67. System.out.println("server init failied");
    68. }
    69. }
    70. public void acceptHandler(SelectionKey key) {
    71. ServerSocketChannel server = (ServerSocketChannel) key.channel(); // 获取客户端的channel
    72. try {
    73. SocketChannel client = server.accept();
    74. client.configureBlocking(false); // 设置client非阻塞
    75. System.out.println("server receive a client :" + client);
    76. // 注册OP_READ事件,用于从客户端读取数据
    77. // 给Client分配一个buffer,用于读取数据,注意buffer的线程安全
    78. ByteBuffer buffer = ByteBuffer.allocate(1024); // buffer这个参数注册的时候也可以不用
    79. client.register(key.selector(), SelectionKey.OP_READ, buffer);
    80. } catch (IOException exception) {
    81. exception.printStackTrace();
    82. }
    83. }
    84. public void readHandler(SelectionKey key) {
    85. System.out.println("read handler");
    86. SocketChannel client = (SocketChannel) key.channel(); // 获取客户端的channel
    87. ByteBuffer buffer = (ByteBuffer) key.attachment(); // 获取Client channel关联的buffer
    88. buffer.clear(); // 使用前clear
    89. // 防止数据分包,需要while循环读取
    90. try {
    91. while (true) {
    92. int readLen = client.read(buffer);
    93. if (readLen > 0) {
    94. // 读取到数据了
    95. buffer.flip();
    96. byte[] data = new byte[buffer.limit()];
    97. buffer.get(data);
    98. System.out.println("server read data from " + client + ", data is :" + new String(data));
    99. // 转发给其他客户端
    100. forwardMsg(client, data);
    101. } else if (readLen == 0) {
    102. // 没读到数据
    103. System.out.println(client + " : no data");
    104. break;
    105. } else if (readLen < 0) {
    106. // client 关闭连接
    107. // 当客户端主动连接断开时,为了让服务器知道断开了连接,会产生OP_READ事件。所以需要判断读取长度,当读到-1时,关闭channel。
    108. System.out.println(client + " close");
    109. client.close();
    110. break;
    111. }
    112. }
    113. } catch (IOException exception) {
    114. exception.printStackTrace();
    115. // client 关闭连接
    116. System.out.println(client + " disconnect");
    117. // todo:disconnect 导致一直有read事件,怎么办?
    118. try {
    119. client.close();
    120. } catch (IOException ex) {
    121. System.out.println("close ex");
    122. }
    123. }
    124. }
    125. private void forwardMsg(SocketChannel myself, byte[] msg) throws IOException {
    126. Set keys = selector.keys();
    127. // read/write 对应同一个key,同一个client不会发送两遍
    128. for (SelectionKey key : keys) {
    129. SelectableChannel channel = key.channel();
    130. if (channel instanceof SocketChannel && channel != myself) {
    131. // 发送数据
    132. SocketChannel client = (SocketChannel) channel;
    133. System.out.println("forward msg to " + client);
    134. ByteBuffer buff = ByteBuffer.wrap(msg);
    135. while (buff.hasRemaining()) {
    136. client.write(buff);
    137. }
    138. }
    139. }
    140. }
    141. }

    可以看到,这里我们在forwardMsg()方法中,直接通过channel 把数据转发出去了

    测试:

    先启动服务器,然后启动两个客户端

    客户端1发送消息client1,服务端接收到后转发给客户端2

    客户端2发送消息client2,服务端接收到后转发给客户端1

    服务端日志:

    1. server init success
    2. server receive a client :java.nio.channels.SocketChannel[connected local=/127.0.0.1:9090 remote=/127.0.0.1:23536]
    3. server receive a client :java.nio.channels.SocketChannel[connected local=/127.0.0.1:9090 remote=/127.0.0.1:23549]
    4. read handler
    5. server read data from java.nio.channels.SocketChannel[connected local=/127.0.0.1:9090 remote=/127.0.0.1:23536], data is :client1
    6. forward msg to java.nio.channels.SocketChannel[connected local=/127.0.0.1:9090 remote=/127.0.0.1:23549]
    7. java.nio.channels.SocketChannel[connected local=/127.0.0.1:9090 remote=/127.0.0.1:23536] : no data
    8. read handler
    9. server read data from java.nio.channels.SocketChannel[connected local=/127.0.0.1:9090 remote=/127.0.0.1:23549], data is :client2
    10. forward msg to java.nio.channels.SocketChannel[connected local=/127.0.0.1:9090 remote=/127.0.0.1:23536]
    11. java.nio.channels.SocketChannel[connected local=/127.0.0.1:9090 remote=/127.0.0.1:23549] : no data

    客户端1日志:

    1. client connected to server
    2. client1
    3. client receive data from consolejava.io.BufferedInputStream@65231a33 : client1
    4. client receive data from serverjava.net.Socket$SocketInputStream@4629104a data size:8: client2

    客户端2日志:

    1. client connected to server
    2. client receive data from serverjava.net.Socket$SocketInputStream@27f8302d data size:8: client1
    3. client2
    4. client receive data from consolejava.io.BufferedInputStream@72cdfe9a : client2

    测试:客户端1 正常退出关闭

    客户端1日志:

    1. exit
    2. client receive data from consolejava.io.BufferedInputStream@65231a33 : exit
    3. client close :Socket[addr=localhost/127.0.0.1,port=9090,localport=23536]
    4. Exception in thread "main" java.net.SocketException: Socket closed
    5. at java.base/sun.nio.ch.NioSocketImpl.endRead(NioSocketImpl.java:248)
    6. at java.base/sun.nio.ch.NioSocketImpl.implRead(NioSocketImpl.java:327)
    7. at java.base/sun.nio.ch.NioSocketImpl.read(NioSocketImpl.java:350)
    8. at java.base/sun.nio.ch.NioSocketImpl$1.read(NioSocketImpl.java:803)
    9. at java.base/java.net.Socket$SocketInputStream.read(Socket.java:966)
    10. at java.base/java.io.InputStream.read(InputStream.java:218)
    11. at com.huawei.io.chatroom.bio.ChatClient.main(ChatClient.java:29)
    12. Process finished with exit code 1

    服务端日志:

    1. read handler
    2. java.nio.channels.SocketChannel[connected local=/127.0.0.1:9090 remote=/127.0.0.1:23536] close

    测试:客户端2异常断开

    服务端日志:

    1. read handler
    2. java.nio.channels.SocketChannel[connected local=/127.0.0.1:9090 remote=/127.0.0.1:23549] disconnect
    3. java.net.SocketException: Connection reset
    4. at java.base/sun.nio.ch.SocketChannelImpl.throwConnectionReset(SocketChannelImpl.java:394)
    5. at java.base/sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:426)
    6. at com.huawei.io.chatroom.nio.sel.NIOSelectorDirectWriteServer.readHandler(NIOSelectorDirectWriteServer.java:107)
    7. at com.huawei.io.chatroom.nio.sel.NIOSelectorDirectWriteServer.doService(NIOSelectorDirectWriteServer.java:56)
    8. at com.huawei.io.chatroom.nio.sel.NIOSelectorDirectWriteServer.start(NIOSelectorDirectWriteServer.java:34)
    9. at com.huawei.io.chatroom.nio.sel.NIOSelectorDirectWriteServer.main(NIOSelectorDirectWriteServer.java:28)

    二、注册OP_WRITE写数据

  • 相关阅读:
    架构风格相关内容
    页面分配策略(驻留集、页面分配、置换策略、抖动现象、工作集)
    进程同步与互斥
    Android 的整体架构
    java-php-net-python-工会管理系统计算机毕业设计程序
    Apache POC(对Excel文件操作)
    SaaSBase:什么是天润融通?
    网关BL100做主站采集从机设备上云平台示例
    C++智能指针
    分享im即时通讯开发之WebSocket:概念、原理、易错常识
  • 原文地址:https://blog.csdn.net/u013771019/article/details/138153944