在前面的文章中,我们分析了在NIO模式下,服务端接收和读取多个客户端数据的情况;
本文,我们看下NIO模式下,使用Selector如何把数据发送给客户端。
一、直接通过java.nio.channels.SocketChannel#write(java.nio.ByteBuffer)方法写
二、注册OP_WRITE,在写状态满足情况下,通过java.nio.channels.SocketChannel#write(java.nio.ByteBuffer)方法写
我们看下第一种情况:
服务端在有数据需要发送给客户端的情况下,直接使用客户端对应的SocketChannel写数据即可,不考虑数据是否可以正常发送。
这和BIO下,直接通过输出流写数据是一样的
我们可以基于聊天室的功能,把收到的数据直接写给其他客户端
客户端代码统一使用BIO,如下:
- import java.io.IOException;
- import java.io.InputStream;
- import java.io.OutputStream;
- import java.net.Socket;
-
- /**
- * 基于BIO的TCP网络通信的客户端,接收控制台输入的数据,然后通过字节流发送给服务端
- *
- */
- class ChatClient {
- public static void main(String[] args) throws IOException {
- // 连接server
- Socket serverSocket = new Socket("localhost", 9090);
- System.out.println("client connected to server");
-
- // 读取用户在控制台上的输入,并发送给服务器
- new Thread(new ClientThread(serverSocket)).start();
-
- // 接收服务端发送过来的数据
- try (InputStream serverSocketInputStream = serverSocket.getInputStream();) {
- byte[] buffer = new byte[1024];
- int len;
- while ((len = serverSocketInputStream.read(buffer)) != -1) {
- String data = new String(buffer, 0, len);
- System.out.println(
- "client receive data from server" + serverSocketInputStream + " data size:" + len + ": " + data);
- }
- }
-
- }
- }
-
- class ClientThread implements Runnable {
- private Socket serverSocket;
-
- public ClientThread(Socket serverSocket) {
- this.serverSocket = serverSocket;
- }
-
- @Override
- public void run() {
- // 读取用户在控制台上的输入,并发送给服务器
- InputStream in = System.in;
- byte[] buffer = new byte[1024];
- int len;
- try (OutputStream outputStream = serverSocket.getOutputStream();) {
- // read操作阻塞,直到有数据可读,由于后面还要接收服务端转发过来的数据,这两个操作都是阻塞的,所以需要两个线程
- while ((len = in.read(buffer)) != -1) {
- String data = new String(buffer, 0, len);
- System.out.println("client receive data from console" + in + " : " + new String(buffer, 0, len));
- if ("exit\n".equals(data)) {
- // 模拟客户端关闭连接
- System.out.println("client close :" + serverSocket);
- // 这里跳出循环后,try-with-resources 会自动关闭outputStream
- break;
- }
- // 发送数据给服务器端
- outputStream.write(new String(buffer, 0, len).getBytes()); // 此时buffer中是有换行符
- }
- } catch (IOException e) {
- throw new RuntimeException(e);
- }
- }
- }
服务端代码如下:
- import java.io.IOException;
- import java.net.InetSocketAddress;
- import java.nio.ByteBuffer;
- import java.nio.channels.SelectableChannel;
- import java.nio.channels.SelectionKey;
- import java.nio.channels.Selector;
- import java.nio.channels.ServerSocketChannel;
- import java.nio.channels.SocketChannel;
- import java.util.Iterator;
- import java.util.Set;
-
- /**
- * 基于NIO实现服务端,通过Selector基于事件驱动客户端的读取
- * 服务端接收到数据后,直接写回客户端,或者写给其他客户端
- *
- */
- class NIOSelectorDirectWriteServer {
- Selector selector;
-
- public static void main(String[] args) throws IOException {
- NIOSelectorDirectWriteServer server = new NIOSelectorDirectWriteServer();
- server.start(); // 开启监听和事件处理
- }
-
- public void start() {
- initServer();
- // selector非阻塞轮询有哪些感兴趣的事件到了
- doService();
- }
-
- private void doService() {
- if (selector == null) {
- System.out.println("server init failed, without doing read/write");
- return;
- }
- try {
- while (true) {
- while (selector.select() > 0) {
- Set
keys = selector.selectedKeys(); // 感兴趣且准备好的事件 - Iterator
iterator = keys.iterator(); // 迭代器遍历处理,后面要删除集合元素 - while (iterator.hasNext()) {
- SelectionKey key = iterator.next();
- iterator.remove(); // 删除当前元素,防止重复处理
- // 下面根据事件进行分别处理
- if (key.isAcceptable()) {
- // 客户端连接事件
- acceptHandler(key);
- } else if (key.isReadable()) {
- // 读取客户端数据
- readHandler(key);
- }
- }
- }
- }
- } catch (IOException exception) {
- exception.printStackTrace();
- }
- }
-
- private void initServer() {
- try {
- ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
- serverSocketChannel.configureBlocking(false);
- serverSocketChannel.bind(new InetSocketAddress(9090));
-
- // 此时在selector上注册感兴趣的事件
- // 这里先注册OP_ACCEPT: 客户端连接事件
- selector = Selector.open();
- serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
- System.out.println("server init success");
- } catch (IOException exception) {
- exception.printStackTrace();
- System.out.println("server init failied");
- }
- }
-
- public void acceptHandler(SelectionKey key) {
- ServerSocketChannel server = (ServerSocketChannel) key.channel(); // 获取客户端的channel
- try {
- SocketChannel client = server.accept();
- client.configureBlocking(false); // 设置client非阻塞
- System.out.println("server receive a client :" + client);
- // 注册OP_READ事件,用于从客户端读取数据
- // 给Client分配一个buffer,用于读取数据,注意buffer的线程安全
- ByteBuffer buffer = ByteBuffer.allocate(1024); // buffer这个参数注册的时候也可以不用
- client.register(key.selector(), SelectionKey.OP_READ, buffer);
- } catch (IOException exception) {
- exception.printStackTrace();
- }
- }
-
- public void readHandler(SelectionKey key) {
- System.out.println("read handler");
- SocketChannel client = (SocketChannel) key.channel(); // 获取客户端的channel
- ByteBuffer buffer = (ByteBuffer) key.attachment(); // 获取Client channel关联的buffer
- buffer.clear(); // 使用前clear
-
- // 防止数据分包,需要while循环读取
- try {
- while (true) {
- int readLen = client.read(buffer);
- if (readLen > 0) {
- // 读取到数据了
- buffer.flip();
- byte[] data = new byte[buffer.limit()];
- buffer.get(data);
- System.out.println("server read data from " + client + ", data is :" + new String(data));
- // 转发给其他客户端
- forwardMsg(client, data);
- } else if (readLen == 0) {
- // 没读到数据
- System.out.println(client + " : no data");
- break;
- } else if (readLen < 0) {
- // client 关闭连接
- // 当客户端主动连接断开时,为了让服务器知道断开了连接,会产生OP_READ事件。所以需要判断读取长度,当读到-1时,关闭channel。
- System.out.println(client + " close");
- client.close();
- break;
- }
- }
- } catch (IOException exception) {
- exception.printStackTrace();
- // client 关闭连接
- System.out.println(client + " disconnect");
- // todo:disconnect 导致一直有read事件,怎么办?
- try {
- client.close();
- } catch (IOException ex) {
- System.out.println("close ex");
- }
- }
- }
-
- private void forwardMsg(SocketChannel myself, byte[] msg) throws IOException {
- Set
keys = selector.keys(); - // read/write 对应同一个key,同一个client不会发送两遍
- for (SelectionKey key : keys) {
- SelectableChannel channel = key.channel();
- if (channel instanceof SocketChannel && channel != myself) {
- // 发送数据
- SocketChannel client = (SocketChannel) channel;
- System.out.println("forward msg to " + client);
- ByteBuffer buff = ByteBuffer.wrap(msg);
- while (buff.hasRemaining()) {
- client.write(buff);
- }
- }
- }
- }
- }
可以看到,这里我们在forwardMsg()方法中,直接通过channel 把数据转发出去了
测试:
先启动服务器,然后启动两个客户端
客户端1发送消息client1,服务端接收到后转发给客户端2
客户端2发送消息client2,服务端接收到后转发给客户端1
服务端日志:
- server init success
- server receive a client :java.nio.channels.SocketChannel[connected local=/127.0.0.1:9090 remote=/127.0.0.1:23536]
- server receive a client :java.nio.channels.SocketChannel[connected local=/127.0.0.1:9090 remote=/127.0.0.1:23549]
- read handler
- server read data from java.nio.channels.SocketChannel[connected local=/127.0.0.1:9090 remote=/127.0.0.1:23536], data is :client1
-
- forward msg to java.nio.channels.SocketChannel[connected local=/127.0.0.1:9090 remote=/127.0.0.1:23549]
- java.nio.channels.SocketChannel[connected local=/127.0.0.1:9090 remote=/127.0.0.1:23536] : no data
- read handler
- server read data from java.nio.channels.SocketChannel[connected local=/127.0.0.1:9090 remote=/127.0.0.1:23549], data is :client2
-
- forward msg to java.nio.channels.SocketChannel[connected local=/127.0.0.1:9090 remote=/127.0.0.1:23536]
- java.nio.channels.SocketChannel[connected local=/127.0.0.1:9090 remote=/127.0.0.1:23549] : no data
客户端1日志:
- client connected to server
- client1
- client receive data from consolejava.io.BufferedInputStream@65231a33 : client1
-
- client receive data from serverjava.net.Socket$SocketInputStream@4629104a data size:8: client2
客户端2日志:
- client connected to server
- client receive data from serverjava.net.Socket$SocketInputStream@27f8302d data size:8: client1
-
- client2
- client receive data from consolejava.io.BufferedInputStream@72cdfe9a : client2
测试:客户端1 正常退出关闭
客户端1日志:
- exit
- client receive data from consolejava.io.BufferedInputStream@65231a33 : exit
-
- client close :Socket[addr=localhost/127.0.0.1,port=9090,localport=23536]
- Exception in thread "main" java.net.SocketException: Socket closed
- at java.base/sun.nio.ch.NioSocketImpl.endRead(NioSocketImpl.java:248)
- at java.base/sun.nio.ch.NioSocketImpl.implRead(NioSocketImpl.java:327)
- at java.base/sun.nio.ch.NioSocketImpl.read(NioSocketImpl.java:350)
- at java.base/sun.nio.ch.NioSocketImpl$1.read(NioSocketImpl.java:803)
- at java.base/java.net.Socket$SocketInputStream.read(Socket.java:966)
- at java.base/java.io.InputStream.read(InputStream.java:218)
- at com.huawei.io.chatroom.bio.ChatClient.main(ChatClient.java:29)
-
- Process finished with exit code 1
服务端日志:
- read handler
- java.nio.channels.SocketChannel[connected local=/127.0.0.1:9090 remote=/127.0.0.1:23536] close
测试:客户端2异常断开
服务端日志:
- read handler
- java.nio.channels.SocketChannel[connected local=/127.0.0.1:9090 remote=/127.0.0.1:23549] disconnect
- java.net.SocketException: Connection reset
- at java.base/sun.nio.ch.SocketChannelImpl.throwConnectionReset(SocketChannelImpl.java:394)
- at java.base/sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:426)
- at com.huawei.io.chatroom.nio.sel.NIOSelectorDirectWriteServer.readHandler(NIOSelectorDirectWriteServer.java:107)
- at com.huawei.io.chatroom.nio.sel.NIOSelectorDirectWriteServer.doService(NIOSelectorDirectWriteServer.java:56)
- at com.huawei.io.chatroom.nio.sel.NIOSelectorDirectWriteServer.start(NIOSelectorDirectWriteServer.java:34)
- at com.huawei.io.chatroom.nio.sel.NIOSelectorDirectWriteServer.main(NIOSelectorDirectWriteServer.java:28)
二、注册OP_WRITE写数据