• 06【NIO核心组件之Selector】



    六、Selector

    6.1 Selector的简介

    6.1.1 Seelctor概念

    Selector选择器,也叫多路复用器;NIO中实现非阻塞 I/O 的核心对象就是 Selector。当一个连接创建后,不需要创建一个线程来处理这个来连接,这个连接(管道)会被注册到选择器上,选择器可以检查一个或多个 NIO 通道(轮询),并确定哪些通道已经准备好进行读取或写入。这样,一个单独的线程可以管理多个channel,系统不必创建大量的线程,也不必维护这些线程,从而大大减小了系统的开销。

    在这里插入图片描述

    需要注意的是:不是所有的 Channel 都可以被 Selector 复用的。能被注册到Selector的Channel必须要实现SelectableChannel类;比如FileChannel就不支持被注册到Selector。另外,被注册到Selector的Channel都必须设置为非阻塞模式;

    6.1.2 Channel的注册

    Channel注册到Selector后,由Selector来轮询监听所有被注册到该Selector的Channel所触发的事件,这个时候我们就需要关心Selector应该监听Channel触发的哪些事件;

    • Selector能注册的监听事件如下:
      • SelectionKey.OP_READ(1):读事件,当被轮询的Channel读缓冲区有数据可读时触发。

      • SelectionKey.OP_WRITE(4):可写事件,当被轮询的Channel写缓冲区有空闲空间时触发。一般情况下写缓冲区都有空闲空间,小块数据直接写入即可,没必要注册该操作类型,否则该条件不断就绪浪费 CPU;但如果是写密集型的任务,比如文件下载等,缓冲区很可能满,注册该操作类型就很有必要,同时注意写完后取消注册。

      • SelectionKey.OP_CONNECT(8):连接事件,当被轮询到Channel成功连接到其他服务器时触发;

      • SelectionKey.OP_ACCEPT(16):接收事件,当被轮询到Channel接受到新的连接时触发;

    Tips:可写事件比较特殊,一般我们不注册写事件,写操作的就绪条件为底层缓冲区有空闲空间,而写缓冲区绝大部分时间都是有空闲空间的;因此写操作一直是就绪状态的,所以,只有当你确实有数据要写时再注册写操作,并在写完以后马上取消注册。

    6.2 Selector的事件注册与监听

    6.2.1 选择通道

    一旦向Selector注册了一或多个通道,就可以调用几个重载的select()方法。select()方法会返回已经准备就绪的通道数量;

    • int select():获取上一次调用select()方法后,进入就绪通道(包括连接就绪/接受就绪/读就绪/写就绪等)的数量;该方法会阻塞到至少有一个通道在你注册的事件上就绪了。
    • int select(long timeout):和select()方法一样,阻塞timeout毫秒后还没有就绪的事件暂时释放一下线程(返回0),接着进行下一次阻塞
    • int selectNow():不会阻塞,不管什么是否有事件就绪都立刻返回(返回0)

    测试代码:

    package com.dfbz.selector;
    
    import org.junit.Test;
    
    import java.net.InetSocketAddress;
    import java.nio.channels.SelectionKey;
    import java.nio.channels.Selector;
    import java.nio.channels.ServerSocketChannel;
    import java.nio.channels.SocketChannel;
    
    public class Demo01_count方法 {
    
        @Test
        public void server() throws Exception {
    
            // 创建一个服务器
            ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
    
            // 绑定地址
            serverSocketChannel.bind(new InetSocketAddress(9999));
    
            // 设置为非阻塞模式
            serverSocketChannel.configureBlocking(false);
    
            // 获取一个选择器
            Selector selector = Selector.open();
    
            /*
                注册事件:
                    * 读 : SelectionKey.OP_READ (1)
                    * 写 : SelectionKey.OP_WRITE (4)
                    * 连接 : SelectionKey.OP_CONNECT (8)
                    * 接收 : SelectionKey.OP_ACCEPT (16)
             */
            serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
    
            while (true) {
    
                /*
                    获取有多少通道已经就绪(包括连接就绪/接受就绪/读就绪/写就绪等)
                    select方法将会一直阻塞到至少要有一个通道在注册的事件上就绪(返回1),之后将会一直轮询调用select方法获取就绪的事件数量
                 */
                int count = selector.select();
    
                System.out.println("count: " + count);
    
                Thread.sleep(1000);
            }
        }
    
        @Test
        public void client() throws Exception {
            SocketChannel socketChannel = SocketChannel.open();
    
            // 连接服务器
            socketChannel.connect(new InetSocketAddress(9999));
    
            System.out.println("连接成功");
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60

    6.2.2 事件注册与监听

    我们前面了解到,Selector能注册的监听事件有四种(接收/连接/读/写),当Selector监听到某个事件后可以通过selectedKeys()方法获取那些活跃的事件;

    并不是所有的Channel都支持注册所有的事件,下表描述各种 Channel 允许注册的操作类型,Y 表示允许注册,其中服务器 SocketChannel 指由服务器 ServerSocketChannel.accept()返回的对象。

    项目OP_READOP_WRITEOP_CONNECTOP_ACCEPT
    服务器 ServerSocketChannelY
    服务器 SocketChannelYY
    客户端 SocketChannelYYY
    • public Set<SelectionKey> selectedKeys():获取Selector监听到的事件集合。一个SelectionKey对象代表一个监听事件;

    SelectionKey监听具体事件的方法:

    • public boolean isAcceptable():监听接收事件,当客户端连接到服务端时,服务端的接收事件将被触发;
    • public boolean isConnectable():监听连接事件,当客户端连接到服务端时,客户端的连接事件将被触发;
    • public boolean isReadable():监听读事件,当客户端向服务端写出数据时,服务端的SocketChannel将触发可读数据;
    • public boolean isWritable():监听写事件,当被轮询的Channel写缓冲区有空闲空间时触发(一般情况下都会触发)
    • public boolean isValid():判断当前这个通道是否是有效的;

    1)监听Accept事件

    只有ServerSocketChannel才可以注册Accept事件,因为只有ServerSocketChannel才可以接收其他Channel的请求;

    • 服务端:
    package com.dfbz.selector;
    
    import java.net.InetSocketAddress;
    import java.nio.channels.SelectionKey;
    import java.nio.channels.Selector;
    import java.nio.channels.ServerSocketChannel;
    import java.util.Iterator;
    import java.util.Set;
    
    public class Demo02_监听接收事件_服务端 {
    
        public static void main(String[] args) throws Exception {
            // 创建一个服务器
            ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
    
            // 绑定地址
            serverSocketChannel.bind(new InetSocketAddress(9999));
    
            // 设置为非阻塞模式
            serverSocketChannel.configureBlocking(false);
    
            // 获取一个选择器
            Selector selector = Selector.open();
    
            /*
                注册事件:
                    * 读 : SelectionKey.OP_READ (1)
                    * 写 : SelectionKey.OP_WRITE (4)
                    * 连接 : SelectionKey.OP_CONNECT (8)
                    * 接收 : SelectionKey.OP_ACCEPT (16): 是否有新的连接
             */
            serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
    
            while (true) {
                // 获取有多少通道已经准备就绪
                int count = selector.select();
    
                System.out.println(count);
                
                // 获取那些已经准备就绪的事件
                Set<SelectionKey> selectionKeys = selector.selectedKeys();
    
                Iterator<SelectionKey> iterator = selectionKeys.iterator();
    
                while (iterator.hasNext()) {
                    // 迭代获取每一个事件
                    SelectionKey event = iterator.next();
                    if (event.isAcceptable()) {
                        // 接收就绪事件(当有客户端连接到服务器时触发)
                        System.out.println("isAcceptable");
                    }
                }
    
                Thread.sleep(1000);
            }
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 客户端:
    package com.dfbz.selector;
    
    import java.net.InetSocketAddress;
    import java.nio.channels.SocketChannel;
    
    public class Demo03_监听接收事件_客户端 {
        public static void main(String[] args) throws Exception{
            SocketChannel socketChannel = SocketChannel.open();
    
            // 连接服务器
            socketChannel.connect(new InetSocketAddress(9999));
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13

    再上述案例中,当服务器端的Selector监听到了接收事件时(客户端连接服务器时),select方法将会返回1,并且服务端通过调用Selector的selectedKeys()方法可以获取监听到的具体事件;这里存在两个问题:

    • 1)当Selector被激活后(第一次监听到事件),Selector将会一直轮询select方法,如果没有新的事件准备就绪则返回0,有事件则f返回触发事件的数量;如果select()方法返回0,说明没有新的就绪事件,本次循环应该结束;
    • 2)调用selectedKeys()方法后,可以获取Selector监听到的事件,获取到事件后,我们可以通过一系列方法来判断到底触发的是什么事件,然后做具体的逻辑处理;但是逻辑处理完毕后,事件依旧存在,不会被移除。也就是说,下次调用selectedKeys()方法获取Selector监听到的事件时,上一次依旧处理的事件仍然存在,因此我们在处理完事件后,必须将事件移除;

    服务端改进代码:

    package com.dfbz.selector;
    
    import java.net.InetSocketAddress;
    import java.nio.channels.SelectionKey;
    import java.nio.channels.Selector;
    import java.nio.channels.ServerSocketChannel;
    import java.nio.channels.SocketChannel;
    import java.util.Iterator;
    import java.util.Set;
    
    public class Demo02_监听接收事件_服务端 {
    
        public static void main(String[] args) throws Exception {
            // 创建一个服务器
            ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
    
            // 绑定地址
            serverSocketChannel.bind(new InetSocketAddress(9999));
    
            // 设置为非阻塞模式
            serverSocketChannel.configureBlocking(false);
    
            // 获取一个选择器
            Selector selector = Selector.open();
    
            /*
                注册事件:
                    * 读 : SelectionKey.OP_READ (1)
                    * 写 : SelectionKey.OP_WRITE (4)
                    * 连接 : SelectionKey.OP_CONNECT (8)
                    * 接收 : SelectionKey.OP_ACCEPT (16): 是否有新的连接
             */
            serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
    
            while (true) {
                // 获取有多少通道已经准备就绪
                int count = selector.select();
                System.out.println(count);
    
                /*if (count == 0) {
                    // 代表还没有准备就绪的事件
                    continue;
                }*/
    
                // 获取那些已经准备就绪的事件
                Set<SelectionKey> selectionKeys = selector.selectedKeys();
    
                Iterator<SelectionKey> iterator = selectionKeys.iterator();
    
                while (iterator.hasNext()) {
                    // 迭代获取每一个事件
                    SelectionKey event = iterator.next();
                    if (event.isAcceptable()) {
                        // 接收就绪事件(当有客户端连接到服务器时触发)
                        System.out.println("isAcceptable");
    
                        // 接收客户端(如果不接收,那么selector将一直监听到serverSocketChannel的接收事件)
                        SocketChannel socketChannel = serverSocketChannel.accept();
                    }
                    /*
                        处理完事件后记得要移除事件,否则该一直存在selectedKeys集合中
                        如果不移除,下次调用selectedKeys()方法时,即使没有触发该事件也能获取到该事件
                     */
                    iterator.remove();
                }
    
                Thread.sleep(1000);
            }
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70

    2)监听Connect事件

    只有客户端的SocketChannel才可以监听Connect事件,因为只有客户端的SocketChanel才可以连接其他Channel(连接服务端);

    • 示例代码-服务端:
    package com.dfbz.selector;
    
    import java.net.InetSocketAddress;
    import java.nio.channels.ServerSocketChannel;
    
    public class Demo04_监听连接事件_服务端 {
        public static void main(String[] args) throws Exception {
    
            ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
            serverSocketChannel.bind(new InetSocketAddress(8888));
    
            while (true) {
                serverSocketChannel.accept();
            }
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 示例代码-客户端:
    package com.dfbz.selector;
    
    import java.net.InetSocketAddress;
    import java.nio.channels.SelectionKey;
    import java.nio.channels.Selector;
    import java.nio.channels.SocketChannel;
    import java.util.Iterator;
    import java.util.Set;
    
    public class Demo05_监听连接事件_客户端 {
        public static void main(String[] args) throws Exception {
            // 获取一个客户端的socketChannel
            SocketChannel socketChannel = SocketChannel.open();
    
            // 设置为非阻塞面模式
            socketChannel.configureBlocking(false);
    
            // 获取一个Selector选择器
            Selector selector = Selector.open();
            socketChannel.register(selector, SelectionKey.OP_CONNECT);
    
            // 开启一个新的线程来开启监听任务
            new Thread() {
                @Override
                public void run() {
                    try {
                        while (true) {
                            // 当selector监听到具体的事件后
                            int count = selector.select();
                            
                            if (count == 0) {
                                continue;
                            }
                            // 获取监听的事件集合
                            Set<SelectionKey> selectionKeys = selector.selectedKeys();
                            Iterator<SelectionKey> iterator = selectionKeys.iterator();
    
                            // 迭代遍历每一个事件
                            while (iterator.hasNext()) {
                                SelectionKey selectionKey = iterator.next();
    
                                // 判断是否是触发连接事件
                                if (selectionKey.isConnectable()) {
                                    System.out.println("isConnectable");
                                }
    
                                // 移除事件
                                iterator.remove();
                            }
    
                            Thread.sleep(1000);
                        }
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
    
                }
            }.start();
    
            // 连接服务器(触发连接事件)
            socketChannel.connect(new InetSocketAddress(9999));
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63

    3)监听Read事件

    客户端和服务端的SocketChannel都可以监听Read事件,但ServerSocketChannel不可以监听读事件;

    1)监听服务端的SocketChannel
    • 监听服务端的SocketChannel-读事件-服务端代码:
    package com.dfbz.selector;
    
    import java.net.InetSocketAddress;
    import java.nio.ByteBuffer;
    import java.nio.channels.SelectionKey;
    import java.nio.channels.Selector;
    import java.nio.channels.ServerSocketChannel;
    import java.nio.channels.SocketChannel;
    import java.util.Iterator;
    import java.util.Set;
    
    public class Demo06_监听服务端的Read事件_服务端 {
        public static void main(String[] args) throws Exception {
    
            // 创建一个服务器
            ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
            serverSocketChannel.bind(new InetSocketAddress(7777));
    
            System.out.println("等待客户端连接...");
            // 接收到一个客户端
            SocketChannel socketChannel = serverSocketChannel.accept();
    
            System.out.println("客户端【" + socketChannel.getRemoteAddress() + "】连接成功");
    
            // 将客户端的channel设置为非阻塞模式(需要注册到Selector)
            socketChannel.configureBlocking(false);
    
            // 创建一个Selector选择器
            Selector selector = Selector.open();
    
            // 将客户的Channel注册到选择器上,并让选择器监听该socketChanel的读事件(当客户端发送数据来时将触发)
            socketChannel.register(selector, SelectionKey.OP_READ);
    
            // 创建一个buffer用来接收客户端发送过来的数据
            ByteBuffer buffer = ByteBuffer.allocate(1024);
    
            while (true) {
    
                // 是否有事件就绪
                int count = selector.select();
    
                if (count == 0) {
                    continue;
                }
    
                // 获取触发的事件
                Set<SelectionKey> selectionKeys = selector.selectedKeys();
    
                Iterator<SelectionKey> iterator = selectionKeys.iterator();
    
                // 迭代每个事件
                while (iterator.hasNext()) {
    
                    SelectionKey selectionKey = iterator.next();
    
                    // 判断是否触发可读事件
                    if (selectionKey.isReadable()) {
                        System.out.println("触发读事件啦,客户端发送过来的数据是: ");
    
                        socketChannel.read(buffer);
                        buffer.clear();
                        System.out.println(new String(buffer.array(), 0, buffer.array().length));
    
                    }
                    iterator.remove();
                }
            }
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 监听服务端的SocketChannel-读事件-客户端代码:
    package com.dfbz.selector;
    
    import java.net.InetSocketAddress;
    import java.nio.ByteBuffer;
    import java.nio.channels.SocketChannel;
    import java.util.Scanner;
    
    public class Demo07_监听客户端的Read事件_客户端 {
        public static void main(String[] args) throws Exception {
            // 获取一个客户端的socketChannel
            SocketChannel socketChannel = SocketChannel.open();
            socketChannel.connect(new InetSocketAddress(7777));
    
            Scanner scanner = new Scanner(System.in);
            ByteBuffer buffer = ByteBuffer.allocate(1024);
            while (true){
    
                String str = scanner.nextLine();
    
                buffer.put(str.getBytes());
    
                // limit=position,position=0
                buffer.flip();
    
                // 将数据写道服务器的SocketChannel(服务器的SocketChannel触发读事件)
                socketChannel.write(buffer);        // socketChannel会把数据从buffer中读出来,造成position位移
    
                // position=0,limit=capacity
                buffer.clear();
            }
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    2)监听客户端的SocketChannel
    • 监听客户端的SocketChannel-读事件-服务端代码:
    package com.dfbz.selector;
    
    import java.net.InetSocketAddress;
    import java.nio.ByteBuffer;
    import java.nio.channels.SelectionKey;
    import java.nio.channels.Selector;
    import java.nio.channels.ServerSocketChannel;
    import java.nio.channels.SocketChannel;
    import java.util.Iterator;
    import java.util.Scanner;
    import java.util.Set;
    
    public class Demo08_监听服务端的Read事件_服务端 {
        public static void main(String[] args) throws Exception {
    
            // 创建一个服务器
            ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
            serverSocketChannel.bind(new InetSocketAddress(7777));
    
            System.out.println("等待客户端连接...");
            // 接收到一个客户端
            SocketChannel socketChannel = serverSocketChannel.accept();
    
            System.out.println("客户端【" + socketChannel.getRemoteAddress() + "】连接成功");
    
            // 创建Buffer用于存储要发送给客户端的数据
            ByteBuffer buffer = ByteBuffer.allocate(1024);
    
            Scanner scanner = new Scanner(System.in);
    
            while (true){
    
                String str = scanner.nextLine();
    
                buffer.put(str.getBytes());
    
                // limit=position,position=0
                buffer.flip();
    
                // 将数据写道客户端的SocketChannel(客户端的SocketChannel触发读事件)
                socketChannel.write(buffer);        // socketChannel会把数据从buffer中读出来,造成position位移
    
                // position=0,limit=capacity
                buffer.clear();
            }
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 监听客户端的SocketChannel-读事件-客户端代码:
    package com.dfbz.selector;
    
    import java.net.InetSocketAddress;
    import java.nio.ByteBuffer;
    import java.nio.channels.SelectionKey;
    import java.nio.channels.Selector;
    import java.nio.channels.SocketChannel;
    import java.util.Iterator;
    import java.util.Set;
    
    public class Demo09_监听客户端的Read事件_客户端 {
        public static void main(String[] args) throws Exception {
            // 获取一个客户端的socketChannel
            SocketChannel socketChannel = SocketChannel.open();
            socketChannel.connect(new InetSocketAddress(7777));
            socketChannel.configureBlocking(false);
    
            // 创建选择器
            Selector selector = Selector.open();
    
            // 注册读事件(当服务端有数据写到客户端时触发)
            socketChannel.register(selector, SelectionKey.OP_READ);
    
            ByteBuffer buffer = ByteBuffer.allocate(1024);
            while (true) {
    
                // 是否有事件就绪
                int count = selector.select();
    
                if (count == 0) {
                    continue;
                }
    
                // 获取触发的事件
                Set<SelectionKey> selectionKeys = selector.selectedKeys();
    
                Iterator<SelectionKey> iterator = selectionKeys.iterator();
    
                // 迭代每个事件
                while (iterator.hasNext()) {
    
                    SelectionKey selectionKey = iterator.next();
    
                    // 判断是否触发可读事件
                    if (selectionKey.isReadable()) {
                        System.out.println("触发读事件啦,服务端发送过来的数据是: ");
    
                        socketChannel.read(buffer);
                        buffer.clear();
                        System.out.println(new String(buffer.array(), 0, buffer.array().length));
    
                    }
                    iterator.remove();
                }
            }
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57

    4)监听Write事件

    服务端和客户端的SocketChannel都可以注册写事件,当SocketChannel写就绪时触发该事件,默认情况下,写一直都处于就绪状态,因此一旦SocketChannel监听了写事件,Selector的select将永远返回1(事件永远准备就绪);

    package com.dfbz.selector;
    
    import java.net.InetSocketAddress;
    import java.nio.channels.SelectionKey;
    import java.nio.channels.Selector;
    import java.nio.channels.ServerSocketChannel;
    import java.nio.channels.SocketChannel;
    import java.util.Iterator;
    import java.util.Set;
    
    public class Demo10_监听服务端的Write事件_服务端 {
        public static void main(String[] args) throws Exception {
    
            // 创建一个服务器
            ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
            serverSocketChannel.bind(new InetSocketAddress(6666));
    
            System.out.println("等待客户端连接...");
            // 接收到一个客户端
            SocketChannel socketChannel = serverSocketChannel.accept();
    
            System.out.println("客户端【" + socketChannel.getRemoteAddress() + "】连接成功");
    
            // 将客户端的channel设置为非阻塞模式(需要注册到Selector)
            socketChannel.configureBlocking(false);
    
            // 创建一个Selector选择器
            Selector selector = Selector.open();
    
            // 将socketChannel注册到Selector上,并监听写事件
            socketChannel.register(selector, SelectionKey.OP_WRITE);
    
            while (true) {
    
                // 是否有事件就绪
                int count = selector.select();
    
                if (count == 0) {
                    continue;
                }
    
                // 获取触发的事件
                Set<SelectionKey> selectionKeys = selector.selectedKeys();
    
                Iterator<SelectionKey> iterator = selectionKeys.iterator();
    
                // 迭代每个事件
                while (iterator.hasNext()) {
    
                    SelectionKey selectionKey = iterator.next();
    
                    // 判断是否触发可读事件(默认情况下一直是处于可写状态)
                    if (selectionKey.isWritable()) {
                        System.out.println("isWritable");
                    }
                    iterator.remove();
    
                    Thread.sleep(1000);
                }
            }
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 客户端:
    package com.dfbz.selector;
    
    import java.net.InetSocketAddress;
    import java.nio.channels.SocketChannel;
    
    public class Demo11_监听服务端的Write事件_客户端 {
        public static void main(String[] args) throws Exception{
            SocketChannel socketChannel = SocketChannel.open();
    
            // 连接服务器(服务器的接收事件将要触发)
            socketChannel.connect(new InetSocketAddress(6666));
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13

    Tips:客户端的SocketChannel也可以监听写事件,代码和服务端的SocketChannel一致,这里就不举例了;

    6.3 Selector其他方法

    • Selector常用方法:

      • public abstract Set<SelectionKey> selectedKeys()
      • public abstract Set<SelectionKey> keys()
      • public abstract void close()
      • public abstract boolean isOpen()
      • public abstract Selector wakeup()
      • public abstract SelectorProvider provider()
    • SelectionKey常用方法:

      • public abstract SelectableChannel channel()
      • public abstract int interestOps()
      • public abstract SelectionKey interestOps(int ops)
      • public abstract int readyOps()
      • public abstract void cancel()
      • public final Object attach(Object ob)
      • public final Object attachment()
      • public abstract Selector selector()

    6.4 Selector综合案例

    Selector正是Java NIO实现IO多路复用的核心组件,当Channel都被注册到Selector后,服务端只需要一个线程来轮询查看注册到Selector的哪些Channel,如果有准备就绪的事件将会执行;这样一来就可以解决我们传统情况下的一个客户端对应一个线程的IO模型了,大大减少服务端的开销;

    6.4.1 Selector改造多线程聊天案例

    1)服务端代码:

    package com.dfbz.selector;
    
    import java.net.InetSocketAddress;
    import java.nio.ByteBuffer;
    import java.nio.channels.SelectionKey;
    import java.nio.channels.Selector;
    import java.nio.channels.ServerSocketChannel;
    import java.nio.channels.SocketChannel;
    import java.util.Iterator;
    import java.util.Set;
    
    public class Test1_selector改造网络聊天_服务端 {
    
        public static void main(String[] args) throws Exception {
    
            ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
            serverSocketChannel.configureBlocking(false);
            serverSocketChannel.bind(new InetSocketAddress("127.0.0.1", 9999));
    
            Selector selector = Selector.open();
            serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
    
            System.out.println("等待客户端连接....");
    
            ByteBuffer buffer = ByteBuffer.allocate(1024);
    
            while (true) {
                if (selector.select() == 0) {
                    continue;
                }
    
                Set<SelectionKey> keys = selector.selectedKeys();
                Iterator<SelectionKey> iterator = keys.iterator();
    
                while (iterator.hasNext()) {
    
                    SelectionKey selectionKey = iterator.next();
    
                    if (selectionKey.isAcceptable()) {
                        // 代表有客户端连接了
                        SocketChannel socketChannel = serverSocketChannel.accept();       // 获取触发连接事件的那个客户端
                        socketChannel.configureBlocking(false);
    
                        // 注册读事件到Selector上
                        socketChannel.register(selector, SelectionKey.OP_READ);
    
                        System.out.println("客户端【" + socketChannel.getRemoteAddress() + "】连接成功..");
                    } else if (selectionKey.isReadable()) {
    
                        // 触发读事件,代表客户端发送信息到服务端了
                        SocketChannel socketChannel = (SocketChannel) selectionKey.channel();       // 获取触发连接事件的那个客户端
    
                        socketChannel.read(buffer);
    
                       // limit=position,position=0
                        buffer.flip();
    
                        System.out.println("客户端【" + socketChannel.getRemoteAddress() + "】的数据: " + new String(buffer.array(), 0, buffer.array().length));
                    }
    
                    // 处理完逻辑后移除事件
                    iterator.remove();
                }
            }
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66

    2)客户端代码:

    package com.dfbz.selector;
    
    import java.io.IOException;
    import java.net.InetSocketAddress;
    import java.nio.ByteBuffer;
    import java.nio.channels.SocketChannel;
    import java.util.Scanner;
    
    public class Test2_selector改造网络聊天_客户端 {
        public static void main(String[] args) throws Exception {
            SocketChannel socketChannel = SocketChannel.open();
            socketChannel.connect(new InetSocketAddress("127.0.0.1", 9999));
    
            // 写线程
            new Thread() {
                @Override
                public void run() {
    
                    try {
                        Scanner scanner = new Scanner(System.in);
    
                        // 用于封装要发送给客户端的数据
                        ByteBuffer buffer = ByteBuffer.allocate(1024);
    
                        while (true) {
                            String str = scanner.nextLine();
                            buffer.put(str.getBytes());
    
                            // limit=position, position=0
                            buffer.flip();
    
                            socketChannel.write(buffer);
    
                            // position=0,limit=capacity
                            buffer.clear();
                        }
                    } catch (IOException exception) {
                        exception.printStackTrace();
                    }
    
                }
            }.start();
    
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45

    运行效果:

    在这里插入图片描述

  • 相关阅读:
    细粒度特征提取和定位用于目标检测:PPCNN
    Pyside2&PyQt5设置背景图片不影响其它控件
    django6返回数据库最新一条数据
    关于利用python进行文本读取的技巧
    Openssl生成证书-nginx使用ssl
    GO语言的由来与发展历程
    Python性能测试框架Locust实战教程
    终于来啦!DALL·E API公测开始;从加减乘除到机器学习·系列电子书;Python技术面试题库;新型AI图片库;前沿论文 | ShowMeAI资讯日报
    【C/PTA】函数专项练习(二)
    模型部署踩坑(持续更新ing)
  • 原文地址:https://blog.csdn.net/Bb15070047748/article/details/125612332