netty提供2种IO模型,nio线程模型、oio线程模型。这两个模型到底有什么区别? nio线程模型:有专门接受请求的线程,他只接受并不处理。 oio线程模型:线程接受任务后并且处理掉任务。nio线程模型、oio线程模型netty有提供线程池的概念。在oio模型处理接受请求的线程组可以并行处理任务,在nio线程模型有专门的boss线程组、worker线程组。所以,netty的线程模型是2类线程模型,4种线程模型可能性。使用oio线程模型的netty并发要比使用nio线程模型的差很多。
| IO类型 | IO形式 | 通讯特点 |
| oio线程模型 | oio单线程模型 | 同时只容许1个客户端与服务端通讯 |
| oio线程池模型 | 同时容许<=线程池数量的客户端数与服务端通讯 | |
| nio线程模型 | nio单线程模型 | 同时只容许1个客户端与服务端通讯,是否是netty BUG |
| nio线程池模型 | 可以同时容许多个客户端通讯 |
boss线程组、worker线程组都设置为单线程模式。
- /**
- * boss线程组
- */
- EventLoopGroup bossGroup = new OioEventLoopGroup(0,Executors.newSingleThreadExecutor());
-
- /**
- * worker线程组
- */
- EventLoopGroup workGroup = new OioEventLoopGroup(0,Executors.newSingleThreadExecutor());
-
-
- //设置nio类型的channel
- b.channel(OioServerSocketChannel.class);
从截图可以看出,oio单线程模型只容许一个客户端和服务端通讯。当有新服务端加入通讯,会被阻塞。

- //OioServerSocketChannel.java
- @Override
- protected int doReadMessages(List throws Exception {
- if (socket.isClosed()) {
- return -1;
- }
-
- try {
- //netty oio模型使用bio通讯的证据
- Socket s = socket.accept();
- try {
- //这块吧socket给线程池去处理,线程池线程数,可以自定义处理
- buf.add(new OioSocketChannel(this, s));
- return 1;
- } catch (Throwable t) {
- logger.warn("Failed to create a new channel from an accepted socket.", t);
- try {
- s.close();
- } catch (Throwable t2) {
- logger.warn("Failed to close a socket.", t2);
- }
- }
- } catch (SocketTimeoutException e) {
- // Expected
- }
- return 0;
- }
- }
这块就不演示了oio通过OioSocketChannel包装,所有的socket处理请求是OioSocketChannel处理,其处理委托给一个线程池处理。
从截图可以看出,oio多线程模型(具体2个线程)只容许2个客户端和服务端通讯。当有新服务端加入通讯,会被阻塞。

nio线程模型,其是基于jdk nio io模型的。但是nio模型EventLoopGroup work线程组设置不能是单线程,否则不支持同时处理多个客户端的能力。这在nio是支持的,这个可能是netty的一个bug?还搞不清楚。
- //AbstractNioChannel.java
- protected void doRegister() throws Exception {
- boolean selected = false;
- for (;;) {
- try {
- selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this);
- return;
- } catch (CancelledKeyException e) {
- if (!selected) {
- // Force the Selector to select now as the "canceled" SelectionKey may still be
- // cached and not removed because no Select.select(..) operation was called yet.
- eventLoop().selectNow();
- selected = true;
- } else {
- // We forced a select operation on the selector before but the SelectionKey is still cached
- // for whatever reason. JDK bug ?
- throw e;
- }
- }
- }
- }
netty其实就是一个单线程、多线程的外壳,包裹这jdk的 bio、nio实现。bio虽然是同步阻塞io,但是如果有多线程支持一样的可以做到同时支持多个客户端。nio天生就支持多个客户端,netty将接受请求线程、工作线程分开处理。
EventLoopGroup[boss、worke线程组] -->NioEventLoopGroup[nio线程组]-->NioEventLoop.openSelector()方法打开select监听.
- public ServerBootstrap group(EventLoopGroup parentGroup, EventLoopGroup childGroup) {
- //boss线程继续执行
- super.group(parentGroup);
- if (this.childGroup != null) {
- throw new IllegalStateException("childGroup set already");
- }
- this.childGroup = ObjectUtil.checkNotNull(childGroup, "childGroup");
- return this;
- }
-
- //获取selector,并对其进行优化,不同操作系统jdk实现不同
- //NioEventLoop.java
- private SelectorTuple openSelector() {
- final Selector unwrappedSelector;
- try {
- //创建java原生selector
- unwrappedSelector = provider.openSelector();
- } catch (IOException e) {
- throw new ChannelException("failed to open a new selector", e);
- }
-
- //是否需要优化,默认需要DISABLE_KEYSET_OPTIMIZATION=false
- if (DISABLE_KEYSET_OPTIMIZATION) {
- return new SelectorTuple(unwrappedSelector);
- }
-
- //尝试获取sun.nio.ch.SelectorImpl的class对象
- Object maybeSelectorImplClass = AccessController.doPrivileged(new PrivilegedAction
- @Override
- public Object run() {
- try {
- return Class.forName(
- "sun.nio.ch.SelectorImpl",
- false,
- PlatformDependent.getSystemClassLoader());
- } catch (Throwable cause) {
- return cause;
- }
- }
- });
-
- //如果返回maybeSelectorImplClass不是一个class对象,或者maybeSelectorImplClass不是(unwrappedSelector.getClass()他的子类。
- if (!(maybeSelectorImplClass instanceof Class) ||
- // ensure the current selector implementation is what we can instrument.
- !((Class>) maybeSelectorImplClass).isAssignableFrom(unwrappedSelector.getClass())) {
-
- //如果是异常说明上面方法抛出异常
- if (maybeSelectorImplClass instanceof Throwable) {
- Throwable t = (Throwable) maybeSelectorImplClass;
- logger.trace("failed to instrument a special java.util.Set into: {}", unwrappedSelector, t);
- }
- //创建一个SelectorTuple,内部unwrappedSelector并没有被优化
- return new SelectorTuple(unwrappedSelector);
- }
-
- //selector的class对象
- final Class> selectorImplClass = (Class>) maybeSelectorImplClass;
- //内部素组结构实现的set接口
- final SelectedSelectionKeySet selectedKeySet = new SelectedSelectionKeySet();
-
- Object maybeException = AccessController.doPrivileged(new PrivilegedAction
- @Override
- public Object run() {
- try {
- //反射获取属性
- Field selectedKeysField = selectorImplClass.getDeclaredField("selectedKeys");
- Field publicSelectedKeysField = selectorImplClass.getDeclaredField("publicSelectedKeys");
-
- //设置属性可访问
- Throwable cause = ReflectionUtil.trySetAccessible(selectedKeysField, true);
- if (cause != null) {
- return cause;
- }
-
- //设置属性可访问
- cause = ReflectionUtil.trySetAccessible(publicSelectedKeysField, true);
- if (cause != null) {
- return cause;
- }
-
- //把原来属性设置为selectedKeySet(它是素组实现),原来是hashmap实现
- selectedKeysField.set(unwrappedSelector, selectedKeySet);
- publicSelectedKeysField.set(unwrappedSelector, selectedKeySet);
- return null;
- } catch (NoSuchFieldException e) {
- return e;
- } catch (IllegalAccessException e) {
- return e;
- }
- }
- });
-
- //如果返回结果是异常信息
- if (maybeException instanceof Exception) {
- selectedKeys = null;
- Exception e = (Exception) maybeException;
- logger.trace("failed to instrument a special java.util.Set into: {}", unwrappedSelector, e);
- //返回SelectorTuple,内部unwrappedSelector并没有被优化
- return new SelectorTuple(unwrappedSelector);
- }
-
- //赋值
- selectedKeys = selectedKeySet;
- logger.trace("instrumented a special java.util.Set into: {}", unwrappedSelector);
-
- //unwrappedSelector内部的selectedKeys和publicSelectedKeys俩个数据结构已用素组优化
- //SelectedSelectionKeySetSelector包装了unwrappedSelector和selectedKeySet
- return new SelectorTuple(unwrappedSelector,
- new SelectedSelectionKeySetSelector(unwrappedSelector, selectedKeySet));
- }
- //boss select监听NioEventLoop.run()
- //NioEventLoop.java
- protected void run() {
- // loop,循环处理IO事件或者处理线程池中的task任务
- for (;;) {
- try {
- // 判断接下来是是执行select还是直接处理IO事件和执行队列中的task
- // hasTask判断当前线程的queue中是否还有待执行的任务
- switch (selectStrategy.calculateStrategy(selectNowSupplier, hasTasks())) {
- case SelectStrategy.CONTINUE:
- // NioEventLoop默认不会有这种状态
- continue;
- case SelectStrategy.SELECT:
- // 说明当前queue中没有task待执行
- select(wakenUp.getAndSet(false));
- // 唤醒epoll_wait
- if (wakenUp.get()) {
- selector.wakeup();
- }
- // fall through
- default:
- }
-
- cancelledKeys = 0;
- needsToSelectAgain = false;
- // 这个比例是处理IO事件所需的时间和花费在处理task时间的比例
- final int ioRatio = this.ioRatio;
- if (ioRatio == 100) {
- // 如果比例是100,表示每次都处理完IO事件后,执行所有的task
- try {
- processSelectedKeys();
- } finally {
- // Ensure we always run tasks.
- // 保证能执行所有的task
- runAllTasks();
- }
- } else {
- // 记录处理IO事件开始的时间
- final long ioStartTime = System.nanoTime();
- try {
- // 处理IO事件
- processSelectedKeys();
- } finally {
- // Ensure we always run tasks.
- // 当前时间减去处理IO事件开始的时间就是处理IO事件花费的时间
- final long ioTime = System.nanoTime() - ioStartTime;
- // 执行task的时间taskTime就是ioTime * (100 - ioRatio) / ioRatio
- // 如果taskTime时间到了还有未执行的task,runAllTasks也会返回
- runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
- }
- }
- } catch (Throwable t) {
- handleLoopException(t);
- }
- // Always handle shutdown even if the loop processing threw an exception.
- try {
- // 如果已经shutdown则关闭所有资源
- if (isShuttingDown()) {
- closeAll();
- if (confirmShutdown()) {
- return;
- }
- }
- } catch (Throwable t) {
- handleLoopException(t);
- }
- }
- }
-
- // io.netty.channel.DefaultSelectStrategy#calculateStrategy
- public int calculateStrategy(IntSupplier selectSupplier, boolean hasTasks) throws Exception {
- // 如果还有task待执行则先执行selectNow,selectNow是立即返回的,不是阻塞等待
- // 如果没有待执行的task则执行select,有可能是阻塞等待IO事件
- return hasTasks ? selectSupplier.get() : SelectStrategy.SELECT;
- }
-
- // io.netty.channel.nio.NioEventLoop#selectNowSupplier
- private final IntSupplier selectNowSupplier = new IntSupplier() {
- @Override
- public int get() throws Exception {
- // epoll_wait的参数timeout可以指定超时时间,selectNow传入的参数是0,也就是不超时等待立即返回
- return selectNow();
- }
- };
-
- //AbstractNioChannel.java
- protected void doRegister() throws Exception {
- boolean selected = false;
- for (;;) {
- try {
- //循环监听selectionKey状态
- selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this);
- return;
- } catch (CancelledKeyException e) {
- if (!selected) {
- // Force the Selector to select now as the "canceled" SelectionKey may still be
- // cached and not removed because no Select.select(..) operation was called yet.
- eventLoop().selectNow();
- selected = true;
- } else {
- // We forced a select operation on the selector before but the SelectionKey is still cached
- // for whatever reason. JDK bug ?
- throw e;
- }
- }
- }
- }
-
- //NioEventLoop.java
- private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {
- final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe();
- if (!k.isValid()) {
- final EventLoop eventLoop;
- try {
- eventLoop = ch.eventLoop();
- } catch (Throwable ignored) {
- // If the channel implementation throws an exception because there is no event loop, we ignore this
- // because we are only trying to determine if ch is registered to this event loop and thus has authority
- // to close ch.
- return;
- }
- // Only close ch if ch is still registered to this EventLoop. ch could have deregistered from the event loop
- // and thus the SelectionKey could be cancelled as part of the deregistration process, but the channel is
- // still healthy and should not be closed.
- // See https://github.com/netty/netty/issues/5125
- if (eventLoop == this) {
- // close the channel if the key is not valid anymore
- unsafe.close(unsafe.voidPromise());
- }
- return;
- }
-
- try {
- int readyOps = k.readyOps();
- // We first need to call finishConnect() before try to trigger a read(...) or write(...) as otherwise
- // the NIO JDK channel implementation may throw a NotYetConnectedException.
- if ((readyOps & SelectionKey.OP_CONNECT) != 0) {
- // remove OP_CONNECT as otherwise Selector.select(..) will always return without blocking
- // See https://github.com/netty/netty/issues/924
- int ops = k.interestOps();
- ops &= ~SelectionKey.OP_CONNECT;
- k.interestOps(ops);
-
- unsafe.finishConnect();
- }
-
- // Process OP_WRITE first as we may be able to write some queued buffers and so free memory.
- if ((readyOps & SelectionKey.OP_WRITE) != 0) {
- // Call forceFlush which will also take care of clear the OP_WRITE once there is nothing left to write
- ch.unsafe().forceFlush();
- }
-
- // Also check for readOps of 0 to workaround possible JDK bug which may otherwise lead
- // to a spin loop
- if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
- //这点事重点 io就绪调read()
- unsafe.read();
- }
- } catch (CancelledKeyException ignored) {
- unsafe.close(unsafe.voidPromise());
- }
- }
-
- //AbstractNioByteChannel.java
- @Override
- public final void read() {
- final ChannelConfig config = config();
- final ChannelPipeline pipeline = pipeline();
- // 用来处理内存的分配:池化或者非池化 UnpooledByteBufAllocator
- final ByteBufAllocator allocator = config.getAllocator();
- // 用来计算此次读循环应该分配多少内存 AdaptiveRecvByteBufAllocator 自适应计算缓冲分配
- final RecvByteBufAllocator.Handle allocHandle = recvBufAllocHandle();
- allocHandle.reset(config);// 重置为0
-
- ByteBuf byteBuf = null;
- boolean close = false;
- try {
- do {
- byteBuf = allocHandle.allocate(allocator);//这里会根据上一次的读取情况进行自适应的调整大小
- allocHandle.lastBytesRead(doReadBytes(byteBuf));
- if (allocHandle.lastBytesRead() <= 0) {// 如果上一次读到的字节数小于等于0,清理引用和跳出循环
- // nothing was read. release the buffer.
- byteBuf.release();// 引用 -1
- byteBuf = null;
- close = allocHandle.lastBytesRead() < 0;// 如果远程已经关闭连接
- if (close) {
- // There is nothing left to read as we received an EOF.
- readPending = false;
- }
- break;
- }
-
- allocHandle.incMessagesRead(1);// totalMessages += amt;
- readPending = false;
- //
- pipeline.fireChannelRead(byteBuf);
- byteBuf = null;
- } while (allocHandle.continueReading());
-
- allocHandle.readComplete();
- pipeline.fireChannelReadComplete();
-
- if (close) {
- closeOnRead(pipeline);
- }
- } catch (Throwable t) {
- handleReadException(pipeline, byteBuf, t, close, allocHandle);
- } finally {
- if (!readPending && !config.isAutoRead()) {
- removeReadOp();
- }
- }
boss线程池创建打开select监听, NioEventLoop继承多线程ScheduledExecutorService,在run方法实现了获取nio selectKey状态的方法processSelectedKeys.后续调read()封装ByteBuf在保存在堆外内存.
将线程池注册到Channel通道上面,然后通过Channel通道获取线程池执行, TODO
- //ServerBootstrapAcceptor.java
- //childGroup 线程池监听通道
- public void channelRead(ChannelHandlerContext ctx, Object msg) {
- //channel对象构造
- final Channel child = (Channel) msg;
-
- child.pipeline().addLast(childHandler);
-
- setChannelOptions(child, childOptions, logger);
- setAttributes(child, childAttrs);
-
- try {
- //将child通道对象,分配给childGroup处理并监听结果
- childGroup.register(child).addListener(new ChannelFutureListener() {
- @Override
- public void operationComplete(ChannelFuture future) throws Exception {
- if (!future.isSuccess()) {
- forceClose(child, future.cause());
- }
- }
- });
- } catch (Throwable t) {
- forceClose(child, t);
- }
- }
-
-
-
- //Channel对象获取worke线程池
- //AbstractChannelHandlerContext.java
- public EventExecutor executor() {
- if (executor == null) {
- return channel().eventLoop();
- } else {
- return executor;
- }
- }