Selector选择器,也叫多路复用器;NIO中实现非阻塞 I/O 的核心对象就是 Selector。当一个连接创建后,不需要创建一个线程来处理这个来连接,这个连接(管道)会被注册到选择器上,选择器可以检查一个或多个 NIO 通道(轮询),并确定哪些通道已经准备好进行读取或写入。这样,一个单独的线程可以管理多个channel,系统不必创建大量的线程,也不必维护这些线程,从而大大减小了系统的开销。

需要注意的是:不是所有的 Channel 都可以被 Selector 复用的。能被注册到Selector的Channel必须要实现SelectableChannel类;比如FileChannel就不支持被注册到Selector。另外,被注册到Selector的Channel都必须设置为非阻塞模式;
Channel注册到Selector后,由Selector来轮询监听所有被注册到该Selector的Channel所触发的事件,这个时候我们就需要关心Selector应该监听Channel触发的哪些事件;
SelectionKey.OP_READ(1):读事件,当被轮询的Channel读缓冲区有数据可读时触发。
SelectionKey.OP_WRITE(4):可写事件,当被轮询的Channel写缓冲区有空闲空间时触发。一般情况下写缓冲区都有空闲空间,小块数据直接写入即可,没必要注册该操作类型,否则该条件不断就绪浪费 CPU;但如果是写密集型的任务,比如文件下载等,缓冲区很可能满,注册该操作类型就很有必要,同时注意写完后取消注册。
SelectionKey.OP_CONNECT(8):连接事件,当被轮询到Channel成功连接到其他服务器时触发;
SelectionKey.OP_ACCEPT(16):接收事件,当被轮询到Channel接受到新的连接时触发;
Tips:可写事件比较特殊,一般我们不注册写事件,写操作的就绪条件为底层缓冲区有空闲空间,而写缓冲区绝大部分时间都是有空闲空间的;因此写操作一直是就绪状态的,所以,只有当你确实有数据要写时再注册写操作,并在写完以后马上取消注册。
一旦向Selector注册了一或多个通道,就可以调用几个重载的select()方法。select()方法会返回已经准备就绪的通道数量;
int select():获取上一次调用select()方法后,进入就绪通道(包括连接就绪/接受就绪/读就绪/写就绪等)的数量;该方法会阻塞到至少有一个通道在你注册的事件上就绪了。int select(long timeout):和select()方法一样,阻塞timeout毫秒后还没有就绪的事件暂时释放一下线程(返回0),接着进行下一次阻塞int selectNow():不会阻塞,不管什么是否有事件就绪都立刻返回(返回0)测试代码:
package com.dfbz.selector;
import org.junit.Test;
import java.net.InetSocketAddress;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
public class Demo01_count方法 {
@Test
public void server() throws Exception {
// 创建一个服务器
ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
// 绑定地址
serverSocketChannel.bind(new InetSocketAddress(9999));
// 设置为非阻塞模式
serverSocketChannel.configureBlocking(false);
// 获取一个选择器
Selector selector = Selector.open();
/*
注册事件:
* 读 : SelectionKey.OP_READ (1)
* 写 : SelectionKey.OP_WRITE (4)
* 连接 : SelectionKey.OP_CONNECT (8)
* 接收 : SelectionKey.OP_ACCEPT (16)
*/
serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
while (true) {
/*
获取有多少通道已经就绪(包括连接就绪/接受就绪/读就绪/写就绪等)
select方法将会一直阻塞到至少要有一个通道在注册的事件上就绪(返回1),之后将会一直轮询调用select方法获取就绪的事件数量
*/
int count = selector.select();
System.out.println("count: " + count);
Thread.sleep(1000);
}
}
@Test
public void client() throws Exception {
SocketChannel socketChannel = SocketChannel.open();
// 连接服务器
socketChannel.connect(new InetSocketAddress(9999));
System.out.println("连接成功");
}
}
我们前面了解到,Selector能注册的监听事件有四种(接收/连接/读/写),当Selector监听到某个事件后可以通过selectedKeys()方法获取那些活跃的事件;
并不是所有的Channel都支持注册所有的事件,下表描述各种 Channel 允许注册的操作类型,Y 表示允许注册,其中服务器 SocketChannel 指由服务器 ServerSocketChannel.accept()返回的对象。
| 项目 | OP_READ | OP_WRITE | OP_CONNECT | OP_ACCEPT |
|---|---|---|---|---|
| 服务器 ServerSocketChannel | Y | |||
| 服务器 SocketChannel | Y | Y | ||
| 客户端 SocketChannel | Y | Y | Y |
public Set<SelectionKey> selectedKeys():获取Selector监听到的事件集合。一个SelectionKey对象代表一个监听事件;SelectionKey监听具体事件的方法:
public boolean isAcceptable():监听接收事件,当客户端连接到服务端时,服务端的接收事件将被触发;public boolean isConnectable():监听连接事件,当客户端连接到服务端时,客户端的连接事件将被触发;public boolean isReadable():监听读事件,当客户端向服务端写出数据时,服务端的SocketChannel将触发可读数据;public boolean isWritable():监听写事件,当被轮询的Channel写缓冲区有空闲空间时触发(一般情况下都会触发)public boolean isValid():判断当前这个通道是否是有效的;只有ServerSocketChannel才可以注册Accept事件,因为只有ServerSocketChannel才可以接收其他Channel的请求;
package com.dfbz.selector;
import java.net.InetSocketAddress;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.util.Iterator;
import java.util.Set;
public class Demo02_监听接收事件_服务端 {
public static void main(String[] args) throws Exception {
// 创建一个服务器
ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
// 绑定地址
serverSocketChannel.bind(new InetSocketAddress(9999));
// 设置为非阻塞模式
serverSocketChannel.configureBlocking(false);
// 获取一个选择器
Selector selector = Selector.open();
/*
注册事件:
* 读 : SelectionKey.OP_READ (1)
* 写 : SelectionKey.OP_WRITE (4)
* 连接 : SelectionKey.OP_CONNECT (8)
* 接收 : SelectionKey.OP_ACCEPT (16): 是否有新的连接
*/
serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
while (true) {
// 获取有多少通道已经准备就绪
int count = selector.select();
System.out.println(count);
// 获取那些已经准备就绪的事件
Set<SelectionKey> selectionKeys = selector.selectedKeys();
Iterator<SelectionKey> iterator = selectionKeys.iterator();
while (iterator.hasNext()) {
// 迭代获取每一个事件
SelectionKey event = iterator.next();
if (event.isAcceptable()) {
// 接收就绪事件(当有客户端连接到服务器时触发)
System.out.println("isAcceptable");
}
}
Thread.sleep(1000);
}
}
}
package com.dfbz.selector;
import java.net.InetSocketAddress;
import java.nio.channels.SocketChannel;
public class Demo03_监听接收事件_客户端 {
public static void main(String[] args) throws Exception{
SocketChannel socketChannel = SocketChannel.open();
// 连接服务器
socketChannel.connect(new InetSocketAddress(9999));
}
}
再上述案例中,当服务器端的Selector监听到了接收事件时(客户端连接服务器时),select方法将会返回1,并且服务端通过调用Selector的selectedKeys()方法可以获取监听到的具体事件;这里存在两个问题:
服务端改进代码:
package com.dfbz.selector;
import java.net.InetSocketAddress;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.Set;
public class Demo02_监听接收事件_服务端 {
public static void main(String[] args) throws Exception {
// 创建一个服务器
ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
// 绑定地址
serverSocketChannel.bind(new InetSocketAddress(9999));
// 设置为非阻塞模式
serverSocketChannel.configureBlocking(false);
// 获取一个选择器
Selector selector = Selector.open();
/*
注册事件:
* 读 : SelectionKey.OP_READ (1)
* 写 : SelectionKey.OP_WRITE (4)
* 连接 : SelectionKey.OP_CONNECT (8)
* 接收 : SelectionKey.OP_ACCEPT (16): 是否有新的连接
*/
serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
while (true) {
// 获取有多少通道已经准备就绪
int count = selector.select();
System.out.println(count);
/*if (count == 0) {
// 代表还没有准备就绪的事件
continue;
}*/
// 获取那些已经准备就绪的事件
Set<SelectionKey> selectionKeys = selector.selectedKeys();
Iterator<SelectionKey> iterator = selectionKeys.iterator();
while (iterator.hasNext()) {
// 迭代获取每一个事件
SelectionKey event = iterator.next();
if (event.isAcceptable()) {
// 接收就绪事件(当有客户端连接到服务器时触发)
System.out.println("isAcceptable");
// 接收客户端(如果不接收,那么selector将一直监听到serverSocketChannel的接收事件)
SocketChannel socketChannel = serverSocketChannel.accept();
}
/*
处理完事件后记得要移除事件,否则该一直存在selectedKeys集合中
如果不移除,下次调用selectedKeys()方法时,即使没有触发该事件也能获取到该事件
*/
iterator.remove();
}
Thread.sleep(1000);
}
}
}
只有客户端的SocketChannel才可以监听Connect事件,因为只有客户端的SocketChanel才可以连接其他Channel(连接服务端);
package com.dfbz.selector;
import java.net.InetSocketAddress;
import java.nio.channels.ServerSocketChannel;
public class Demo04_监听连接事件_服务端 {
public static void main(String[] args) throws Exception {
ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
serverSocketChannel.bind(new InetSocketAddress(8888));
while (true) {
serverSocketChannel.accept();
}
}
}
package com.dfbz.selector;
import java.net.InetSocketAddress;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.Set;
public class Demo05_监听连接事件_客户端 {
public static void main(String[] args) throws Exception {
// 获取一个客户端的socketChannel
SocketChannel socketChannel = SocketChannel.open();
// 设置为非阻塞面模式
socketChannel.configureBlocking(false);
// 获取一个Selector选择器
Selector selector = Selector.open();
socketChannel.register(selector, SelectionKey.OP_CONNECT);
// 开启一个新的线程来开启监听任务
new Thread() {
@Override
public void run() {
try {
while (true) {
// 当selector监听到具体的事件后
int count = selector.select();
if (count == 0) {
continue;
}
// 获取监听的事件集合
Set<SelectionKey> selectionKeys = selector.selectedKeys();
Iterator<SelectionKey> iterator = selectionKeys.iterator();
// 迭代遍历每一个事件
while (iterator.hasNext()) {
SelectionKey selectionKey = iterator.next();
// 判断是否是触发连接事件
if (selectionKey.isConnectable()) {
System.out.println("isConnectable");
}
// 移除事件
iterator.remove();
}
Thread.sleep(1000);
}
} catch (Exception e) {
e.printStackTrace();
}
}
}.start();
// 连接服务器(触发连接事件)
socketChannel.connect(new InetSocketAddress(9999));
}
}
客户端和服务端的SocketChannel都可以监听Read事件,但ServerSocketChannel不可以监听读事件;
package com.dfbz.selector;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.Set;
public class Demo06_监听服务端的Read事件_服务端 {
public static void main(String[] args) throws Exception {
// 创建一个服务器
ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
serverSocketChannel.bind(new InetSocketAddress(7777));
System.out.println("等待客户端连接...");
// 接收到一个客户端
SocketChannel socketChannel = serverSocketChannel.accept();
System.out.println("客户端【" + socketChannel.getRemoteAddress() + "】连接成功");
// 将客户端的channel设置为非阻塞模式(需要注册到Selector)
socketChannel.configureBlocking(false);
// 创建一个Selector选择器
Selector selector = Selector.open();
// 将客户的Channel注册到选择器上,并让选择器监听该socketChanel的读事件(当客户端发送数据来时将触发)
socketChannel.register(selector, SelectionKey.OP_READ);
// 创建一个buffer用来接收客户端发送过来的数据
ByteBuffer buffer = ByteBuffer.allocate(1024);
while (true) {
// 是否有事件就绪
int count = selector.select();
if (count == 0) {
continue;
}
// 获取触发的事件
Set<SelectionKey> selectionKeys = selector.selectedKeys();
Iterator<SelectionKey> iterator = selectionKeys.iterator();
// 迭代每个事件
while (iterator.hasNext()) {
SelectionKey selectionKey = iterator.next();
// 判断是否触发可读事件
if (selectionKey.isReadable()) {
System.out.println("触发读事件啦,客户端发送过来的数据是: ");
socketChannel.read(buffer);
buffer.clear();
System.out.println(new String(buffer.array(), 0, buffer.array().length));
}
iterator.remove();
}
}
}
}
package com.dfbz.selector;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SocketChannel;
import java.util.Scanner;
public class Demo07_监听客户端的Read事件_客户端 {
public static void main(String[] args) throws Exception {
// 获取一个客户端的socketChannel
SocketChannel socketChannel = SocketChannel.open();
socketChannel.connect(new InetSocketAddress(7777));
Scanner scanner = new Scanner(System.in);
ByteBuffer buffer = ByteBuffer.allocate(1024);
while (true){
String str = scanner.nextLine();
buffer.put(str.getBytes());
// limit=position,position=0
buffer.flip();
// 将数据写道服务器的SocketChannel(服务器的SocketChannel触发读事件)
socketChannel.write(buffer); // socketChannel会把数据从buffer中读出来,造成position位移
// position=0,limit=capacity
buffer.clear();
}
}
}
package com.dfbz.selector;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.Scanner;
import java.util.Set;
public class Demo08_监听服务端的Read事件_服务端 {
public static void main(String[] args) throws Exception {
// 创建一个服务器
ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
serverSocketChannel.bind(new InetSocketAddress(7777));
System.out.println("等待客户端连接...");
// 接收到一个客户端
SocketChannel socketChannel = serverSocketChannel.accept();
System.out.println("客户端【" + socketChannel.getRemoteAddress() + "】连接成功");
// 创建Buffer用于存储要发送给客户端的数据
ByteBuffer buffer = ByteBuffer.allocate(1024);
Scanner scanner = new Scanner(System.in);
while (true){
String str = scanner.nextLine();
buffer.put(str.getBytes());
// limit=position,position=0
buffer.flip();
// 将数据写道客户端的SocketChannel(客户端的SocketChannel触发读事件)
socketChannel.write(buffer); // socketChannel会把数据从buffer中读出来,造成position位移
// position=0,limit=capacity
buffer.clear();
}
}
}
package com.dfbz.selector;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.Set;
public class Demo09_监听客户端的Read事件_客户端 {
public static void main(String[] args) throws Exception {
// 获取一个客户端的socketChannel
SocketChannel socketChannel = SocketChannel.open();
socketChannel.connect(new InetSocketAddress(7777));
socketChannel.configureBlocking(false);
// 创建选择器
Selector selector = Selector.open();
// 注册读事件(当服务端有数据写到客户端时触发)
socketChannel.register(selector, SelectionKey.OP_READ);
ByteBuffer buffer = ByteBuffer.allocate(1024);
while (true) {
// 是否有事件就绪
int count = selector.select();
if (count == 0) {
continue;
}
// 获取触发的事件
Set<SelectionKey> selectionKeys = selector.selectedKeys();
Iterator<SelectionKey> iterator = selectionKeys.iterator();
// 迭代每个事件
while (iterator.hasNext()) {
SelectionKey selectionKey = iterator.next();
// 判断是否触发可读事件
if (selectionKey.isReadable()) {
System.out.println("触发读事件啦,服务端发送过来的数据是: ");
socketChannel.read(buffer);
buffer.clear();
System.out.println(new String(buffer.array(), 0, buffer.array().length));
}
iterator.remove();
}
}
}
}
服务端和客户端的SocketChannel都可以注册写事件,当SocketChannel写就绪时触发该事件,默认情况下,写一直都处于就绪状态,因此一旦SocketChannel监听了写事件,Selector的select将永远返回1(事件永远准备就绪);
package com.dfbz.selector;
import java.net.InetSocketAddress;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.Set;
public class Demo10_监听服务端的Write事件_服务端 {
public static void main(String[] args) throws Exception {
// 创建一个服务器
ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
serverSocketChannel.bind(new InetSocketAddress(6666));
System.out.println("等待客户端连接...");
// 接收到一个客户端
SocketChannel socketChannel = serverSocketChannel.accept();
System.out.println("客户端【" + socketChannel.getRemoteAddress() + "】连接成功");
// 将客户端的channel设置为非阻塞模式(需要注册到Selector)
socketChannel.configureBlocking(false);
// 创建一个Selector选择器
Selector selector = Selector.open();
// 将socketChannel注册到Selector上,并监听写事件
socketChannel.register(selector, SelectionKey.OP_WRITE);
while (true) {
// 是否有事件就绪
int count = selector.select();
if (count == 0) {
continue;
}
// 获取触发的事件
Set<SelectionKey> selectionKeys = selector.selectedKeys();
Iterator<SelectionKey> iterator = selectionKeys.iterator();
// 迭代每个事件
while (iterator.hasNext()) {
SelectionKey selectionKey = iterator.next();
// 判断是否触发可读事件(默认情况下一直是处于可写状态)
if (selectionKey.isWritable()) {
System.out.println("isWritable");
}
iterator.remove();
Thread.sleep(1000);
}
}
}
}
package com.dfbz.selector;
import java.net.InetSocketAddress;
import java.nio.channels.SocketChannel;
public class Demo11_监听服务端的Write事件_客户端 {
public static void main(String[] args) throws Exception{
SocketChannel socketChannel = SocketChannel.open();
// 连接服务器(服务器的接收事件将要触发)
socketChannel.connect(new InetSocketAddress(6666));
}
}
Tips:客户端的SocketChannel也可以监听写事件,代码和服务端的SocketChannel一致,这里就不举例了;
Selector常用方法:
public abstract Set<SelectionKey> selectedKeys()public abstract Set<SelectionKey> keys()public abstract void close()public abstract boolean isOpen()public abstract Selector wakeup()public abstract SelectorProvider provider()SelectionKey常用方法:
public abstract SelectableChannel channel()public abstract int interestOps()public abstract SelectionKey interestOps(int ops)public abstract int readyOps()public abstract void cancel()public final Object attach(Object ob)public final Object attachment()public abstract Selector selector()Selector正是Java NIO实现IO多路复用的核心组件,当Channel都被注册到Selector后,服务端只需要一个线程来轮询查看注册到Selector的哪些Channel,如果有准备就绪的事件将会执行;这样一来就可以解决我们传统情况下的一个客户端对应一个线程的IO模型了,大大减少服务端的开销;
package com.dfbz.selector;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.Set;
public class Test1_selector改造网络聊天_服务端 {
public static void main(String[] args) throws Exception {
ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
serverSocketChannel.configureBlocking(false);
serverSocketChannel.bind(new InetSocketAddress("127.0.0.1", 9999));
Selector selector = Selector.open();
serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
System.out.println("等待客户端连接....");
ByteBuffer buffer = ByteBuffer.allocate(1024);
while (true) {
if (selector.select() == 0) {
continue;
}
Set<SelectionKey> keys = selector.selectedKeys();
Iterator<SelectionKey> iterator = keys.iterator();
while (iterator.hasNext()) {
SelectionKey selectionKey = iterator.next();
if (selectionKey.isAcceptable()) {
// 代表有客户端连接了
SocketChannel socketChannel = serverSocketChannel.accept(); // 获取触发连接事件的那个客户端
socketChannel.configureBlocking(false);
// 注册读事件到Selector上
socketChannel.register(selector, SelectionKey.OP_READ);
System.out.println("客户端【" + socketChannel.getRemoteAddress() + "】连接成功..");
} else if (selectionKey.isReadable()) {
// 触发读事件,代表客户端发送信息到服务端了
SocketChannel socketChannel = (SocketChannel) selectionKey.channel(); // 获取触发连接事件的那个客户端
socketChannel.read(buffer);
// limit=position,position=0
buffer.flip();
System.out.println("客户端【" + socketChannel.getRemoteAddress() + "】的数据: " + new String(buffer.array(), 0, buffer.array().length));
}
// 处理完逻辑后移除事件
iterator.remove();
}
}
}
}
package com.dfbz.selector;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SocketChannel;
import java.util.Scanner;
public class Test2_selector改造网络聊天_客户端 {
public static void main(String[] args) throws Exception {
SocketChannel socketChannel = SocketChannel.open();
socketChannel.connect(new InetSocketAddress("127.0.0.1", 9999));
// 写线程
new Thread() {
@Override
public void run() {
try {
Scanner scanner = new Scanner(System.in);
// 用于封装要发送给客户端的数据
ByteBuffer buffer = ByteBuffer.allocate(1024);
while (true) {
String str = scanner.nextLine();
buffer.put(str.getBytes());
// limit=position, position=0
buffer.flip();
socketChannel.write(buffer);
// position=0,limit=capacity
buffer.clear();
}
} catch (IOException exception) {
exception.printStackTrace();
}
}
}.start();
}
}
运行效果:
