• 4、Netty 线程模型


    netty提供2种IO模型,nio线程模型、oio线程模型。这两个模型到底有什么区别? nio线程模型:有专门接受请求的线程,他只接受并不处理。 oio线程模型:线程接受任务后并且处理掉任务。nio线程模型、oio线程模型netty有提供线程池的概念。在oio模型处理接受请求的线程组可以并行处理任务,在nio线程模型有专门的boss线程组、worker线程组。所以,netty的线程模型是2类线程模型,4种线程模型可能性。使用oio线程模型的netty并发要比使用nio线程模型的差很多。

    IO类型

    IO形式

    通讯特点

    oio线程模型

    oio单线程模型

    同时只容许1个客户端与服务端通讯

    oio线程池模型

    同时容许<=线程池数量的客户端数与服务端通讯

    nio线程模型

    nio单线程模型

    同时只容许1个客户端与服务端通讯,是否是netty BUG

    nio线程池模型

    可以同时容许多个客户端通讯

    一、OIO单线程模型

    1.1、如何实现netty oio单线程模型?

    boss线程组、worker线程组都设置为单线程模式。

    1. /**
    2. * boss线程组
    3. */
    4. EventLoopGroup bossGroup = new OioEventLoopGroup(0,Executors.newSingleThreadExecutor());
    5. /**
    6. * worker线程组
    7. */
    8. EventLoopGroup workGroup = new OioEventLoopGroup(0,Executors.newSingleThreadExecutor());
    9. //设置nio类型的channel
    10. b.channel(OioServerSocketChannel.class);

    1.2、OIO单线程模型效果

    从截图可以看出,oio单线程模型只容许一个客户端和服务端通讯。当有新服务端加入通讯,会被阻塞。

    1.3、OIO线程模型如何实现?

    1. //OioServerSocketChannel.java
    2. @Override
    3. protected int doReadMessages(List buf) throws Exception {
    4. if (socket.isClosed()) {
    5. return -1;
    6. }
    7. try {
    8. //netty oio模型使用bio通讯的证据
    9. Socket s = socket.accept();
    10. try {
    11. //这块吧socket给线程池去处理,线程池线程数,可以自定义处理
    12. buf.add(new OioSocketChannel(this, s));
    13. return 1;
    14. } catch (Throwable t) {
    15. logger.warn("Failed to create a new channel from an accepted socket.", t);
    16. try {
    17. s.close();
    18. } catch (Throwable t2) {
    19. logger.warn("Failed to close a socket.", t2);
    20. }
    21. }
    22. } catch (SocketTimeoutException e) {
    23. // Expected
    24. }
    25. return 0;
    26. }
    27. }
    28. 二、oio线程池模型

      这块就不演示了oio通过OioSocketChannel包装,所有的socket处理请求是OioSocketChannel处理,其处理委托给一个线程池处理。

      2.2、oio多线程模型效果

      从截图可以看出,oio多线程模型(具体2个线程)只容许2个客户端和服务端通讯。当有新服务端加入通讯,会被阻塞。

      三、nio线程模型

      nio线程模型,其是基于jdk nio io模型的。但是nio模型EventLoopGroup work线程组设置不能是单线程,否则不支持同时处理多个客户端的能力。这在nio是支持的,这个可能是netty的一个bug?还搞不清楚。

      1. //AbstractNioChannel.java
      2. protected void doRegister() throws Exception {
      3. boolean selected = false;
      4. for (;;) {
      5. try {
      6. selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this);
      7. return;
      8. } catch (CancelledKeyException e) {
      9. if (!selected) {
      10. // Force the Selector to select now as the "canceled" SelectionKey may still be
      11. // cached and not removed because no Select.select(..) operation was called yet.
      12. eventLoop().selectNow();
      13. selected = true;
      14. } else {
      15. // We forced a select operation on the selector before but the SelectionKey is still cached
      16. // for whatever reason. JDK bug ?
      17. throw e;
      18. }
      19. }
      20. }
      21. }

      四、netty线程模型总结

      netty其实就是一个单线程、多线程的外壳,包裹这jdk的 bio、nio实现。bio虽然是同步阻塞io,但是如果有多线程支持一样的可以做到同时支持多个客户端。nio天生就支持多个客户端,netty将接受请求线程、工作线程分开处理。

      五、boss线程调用链

      5.1、boss线程池初始化

      EventLoopGroup[boss、worke线程组] -->NioEventLoopGroup[nio线程组]-->NioEventLoop.openSelector()方法打开select监听.

      1. public ServerBootstrap group(EventLoopGroup parentGroup, EventLoopGroup childGroup) {
      2. //boss线程继续执行
      3. super.group(parentGroup);
      4. if (this.childGroup != null) {
      5. throw new IllegalStateException("childGroup set already");
      6. }
      7. this.childGroup = ObjectUtil.checkNotNull(childGroup, "childGroup");
      8. return this;
      9. }
      10. //获取selector,并对其进行优化,不同操作系统jdk实现不同
      11. //NioEventLoop.java
      12. private SelectorTuple openSelector() {
      13. final Selector unwrappedSelector;
      14. try {
      15. //创建java原生selector
      16. unwrappedSelector = provider.openSelector();
      17. } catch (IOException e) {
      18. throw new ChannelException("failed to open a new selector", e);
      19. }
      20. //是否需要优化,默认需要DISABLE_KEYSET_OPTIMIZATION=false
      21. if (DISABLE_KEYSET_OPTIMIZATION) {
      22. return new SelectorTuple(unwrappedSelector);
      23. }
      24. //尝试获取sun.nio.ch.SelectorImpl的class对象
      25. Object maybeSelectorImplClass = AccessController.doPrivileged(new PrivilegedAction() {
      26. @Override
      27. public Object run() {
      28. try {
      29. return Class.forName(
      30. "sun.nio.ch.SelectorImpl",
      31. false,
      32. PlatformDependent.getSystemClassLoader());
      33. } catch (Throwable cause) {
      34. return cause;
      35. }
      36. }
      37. });
      38. //如果返回maybeSelectorImplClass不是一个class对象,或者maybeSelectorImplClass不是(unwrappedSelector.getClass()他的子类。
      39. if (!(maybeSelectorImplClass instanceof Class) ||
      40. // ensure the current selector implementation is what we can instrument.
      41. !((Class) maybeSelectorImplClass).isAssignableFrom(unwrappedSelector.getClass())) {
      42. //如果是异常说明上面方法抛出异常
      43. if (maybeSelectorImplClass instanceof Throwable) {
      44. Throwable t = (Throwable) maybeSelectorImplClass;
      45. logger.trace("failed to instrument a special java.util.Set into: {}", unwrappedSelector, t);
      46. }
      47. //创建一个SelectorTuple,内部unwrappedSelector并没有被优化
      48. return new SelectorTuple(unwrappedSelector);
      49. }
      50. //selector的class对象
      51. final Class selectorImplClass = (Class) maybeSelectorImplClass;
      52. //内部素组结构实现的set接口
      53. final SelectedSelectionKeySet selectedKeySet = new SelectedSelectionKeySet();
      54. Object maybeException = AccessController.doPrivileged(new PrivilegedAction() {
      55. @Override
      56. public Object run() {
      57. try {
      58. //反射获取属性
      59. Field selectedKeysField = selectorImplClass.getDeclaredField("selectedKeys");
      60. Field publicSelectedKeysField = selectorImplClass.getDeclaredField("publicSelectedKeys");
      61. //设置属性可访问
      62. Throwable cause = ReflectionUtil.trySetAccessible(selectedKeysField, true);
      63. if (cause != null) {
      64. return cause;
      65. }
      66. //设置属性可访问
      67. cause = ReflectionUtil.trySetAccessible(publicSelectedKeysField, true);
      68. if (cause != null) {
      69. return cause;
      70. }
      71. //把原来属性设置为selectedKeySet(它是素组实现),原来是hashmap实现
      72. selectedKeysField.set(unwrappedSelector, selectedKeySet);
      73. publicSelectedKeysField.set(unwrappedSelector, selectedKeySet);
      74. return null;
      75. } catch (NoSuchFieldException e) {
      76. return e;
      77. } catch (IllegalAccessException e) {
      78. return e;
      79. }
      80. }
      81. });
      82. //如果返回结果是异常信息
      83. if (maybeException instanceof Exception) {
      84. selectedKeys = null;
      85. Exception e = (Exception) maybeException;
      86. logger.trace("failed to instrument a special java.util.Set into: {}", unwrappedSelector, e);
      87. //返回SelectorTuple,内部unwrappedSelector并没有被优化
      88. return new SelectorTuple(unwrappedSelector);
      89. }
      90. //赋值
      91. selectedKeys = selectedKeySet;
      92. logger.trace("instrumented a special java.util.Set into: {}", unwrappedSelector);
      93. //unwrappedSelector内部的selectedKeys和publicSelectedKeys俩个数据结构已用素组优化
      94. //SelectedSelectionKeySetSelector包装了unwrappedSelector和selectedKeySet
      95. return new SelectorTuple(unwrappedSelector,
      96. new SelectedSelectionKeySetSelector(unwrappedSelector, selectedKeySet));
      97. }
      98. 5.2、boss select监听

        1. //boss select监听NioEventLoop.run()
        2. //NioEventLoop.java
        3. protected void run() {
        4. // loop,循环处理IO事件或者处理线程池中的task任务
        5. for (;;) {
        6. try {
        7. // 判断接下来是是执行select还是直接处理IO事件和执行队列中的task
        8. // hasTask判断当前线程的queue中是否还有待执行的任务
        9. switch (selectStrategy.calculateStrategy(selectNowSupplier, hasTasks())) {
        10. case SelectStrategy.CONTINUE:
        11. // NioEventLoop默认不会有这种状态
        12. continue;
        13. case SelectStrategy.SELECT:
        14. // 说明当前queue中没有task待执行
        15. select(wakenUp.getAndSet(false));
        16. // 唤醒epoll_wait
        17. if (wakenUp.get()) {
        18. selector.wakeup();
        19. }
        20. // fall through
        21. default:
        22. }
        23. cancelledKeys = 0;
        24. needsToSelectAgain = false;
        25. // 这个比例是处理IO事件所需的时间和花费在处理task时间的比例
        26. final int ioRatio = this.ioRatio;
        27. if (ioRatio == 100) {
        28. // 如果比例是100,表示每次都处理完IO事件后,执行所有的task
        29. try {
        30. processSelectedKeys();
        31. } finally {
        32. // Ensure we always run tasks.
        33. // 保证能执行所有的task
        34. runAllTasks();
        35. }
        36. } else {
        37. // 记录处理IO事件开始的时间
        38. final long ioStartTime = System.nanoTime();
        39. try {
        40. // 处理IO事件
        41. processSelectedKeys();
        42. } finally {
        43. // Ensure we always run tasks.
        44. // 当前时间减去处理IO事件开始的时间就是处理IO事件花费的时间
        45. final long ioTime = System.nanoTime() - ioStartTime;
        46. // 执行task的时间taskTime就是ioTime * (100 - ioRatio) / ioRatio
        47. // 如果taskTime时间到了还有未执行的task,runAllTasks也会返回
        48. runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
        49. }
        50. }
        51. } catch (Throwable t) {
        52. handleLoopException(t);
        53. }
        54. // Always handle shutdown even if the loop processing threw an exception.
        55. try {
        56. // 如果已经shutdown则关闭所有资源
        57. if (isShuttingDown()) {
        58. closeAll();
        59. if (confirmShutdown()) {
        60. return;
        61. }
        62. }
        63. } catch (Throwable t) {
        64. handleLoopException(t);
        65. }
        66. }
        67. }
        68. // io.netty.channel.DefaultSelectStrategy#calculateStrategy
        69. public int calculateStrategy(IntSupplier selectSupplier, boolean hasTasks) throws Exception {
        70. // 如果还有task待执行则先执行selectNow,selectNow是立即返回的,不是阻塞等待
        71. // 如果没有待执行的task则执行select,有可能是阻塞等待IO事件
        72. return hasTasks ? selectSupplier.get() : SelectStrategy.SELECT;
        73. }
        74. // io.netty.channel.nio.NioEventLoop#selectNowSupplier
        75. private final IntSupplier selectNowSupplier = new IntSupplier() {
        76. @Override
        77. public int get() throws Exception {
        78. // epoll_wait的参数timeout可以指定超时时间,selectNow传入的参数是0,也就是不超时等待立即返回
        79. return selectNow();
        80. }
        81. };
        82. //AbstractNioChannel.java
        83. protected void doRegister() throws Exception {
        84. boolean selected = false;
        85. for (;;) {
        86. try {
        87. //循环监听selectionKey状态
        88. selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this);
        89. return;
        90. } catch (CancelledKeyException e) {
        91. if (!selected) {
        92. // Force the Selector to select now as the "canceled" SelectionKey may still be
        93. // cached and not removed because no Select.select(..) operation was called yet.
        94. eventLoop().selectNow();
        95. selected = true;
        96. } else {
        97. // We forced a select operation on the selector before but the SelectionKey is still cached
        98. // for whatever reason. JDK bug ?
        99. throw e;
        100. }
        101. }
        102. }
        103. }
        104. //NioEventLoop.java
        105. private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {
        106. final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe();
        107. if (!k.isValid()) {
        108. final EventLoop eventLoop;
        109. try {
        110. eventLoop = ch.eventLoop();
        111. } catch (Throwable ignored) {
        112. // If the channel implementation throws an exception because there is no event loop, we ignore this
        113. // because we are only trying to determine if ch is registered to this event loop and thus has authority
        114. // to close ch.
        115. return;
        116. }
        117. // Only close ch if ch is still registered to this EventLoop. ch could have deregistered from the event loop
        118. // and thus the SelectionKey could be cancelled as part of the deregistration process, but the channel is
        119. // still healthy and should not be closed.
        120. // See https://github.com/netty/netty/issues/5125
        121. if (eventLoop == this) {
        122. // close the channel if the key is not valid anymore
        123. unsafe.close(unsafe.voidPromise());
        124. }
        125. return;
        126. }
        127. try {
        128. int readyOps = k.readyOps();
        129. // We first need to call finishConnect() before try to trigger a read(...) or write(...) as otherwise
        130. // the NIO JDK channel implementation may throw a NotYetConnectedException.
        131. if ((readyOps & SelectionKey.OP_CONNECT) != 0) {
        132. // remove OP_CONNECT as otherwise Selector.select(..) will always return without blocking
        133. // See https://github.com/netty/netty/issues/924
        134. int ops = k.interestOps();
        135. ops &= ~SelectionKey.OP_CONNECT;
        136. k.interestOps(ops);
        137. unsafe.finishConnect();
        138. }
        139. // Process OP_WRITE first as we may be able to write some queued buffers and so free memory.
        140. if ((readyOps & SelectionKey.OP_WRITE) != 0) {
        141. // Call forceFlush which will also take care of clear the OP_WRITE once there is nothing left to write
        142. ch.unsafe().forceFlush();
        143. }
        144. // Also check for readOps of 0 to workaround possible JDK bug which may otherwise lead
        145. // to a spin loop
        146. if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
        147. //这点事重点 io就绪调read()
        148. unsafe.read();
        149. }
        150. } catch (CancelledKeyException ignored) {
        151. unsafe.close(unsafe.voidPromise());
        152. }
        153. }
        154. //AbstractNioByteChannel.java
        155. @Override
        156. public final void read() {
        157. final ChannelConfig config = config();
        158. final ChannelPipeline pipeline = pipeline();
        159. // 用来处理内存的分配:池化或者非池化 UnpooledByteBufAllocator
        160. final ByteBufAllocator allocator = config.getAllocator();
        161. // 用来计算此次读循环应该分配多少内存 AdaptiveRecvByteBufAllocator 自适应计算缓冲分配
        162. final RecvByteBufAllocator.Handle allocHandle = recvBufAllocHandle();
        163. allocHandle.reset(config);// 重置为0
        164. ByteBuf byteBuf = null;
        165. boolean close = false;
        166. try {
        167. do {
        168. byteBuf = allocHandle.allocate(allocator);//这里会根据上一次的读取情况进行自适应的调整大小
        169. allocHandle.lastBytesRead(doReadBytes(byteBuf));
        170. if (allocHandle.lastBytesRead() <= 0) {// 如果上一次读到的字节数小于等于0,清理引用和跳出循环
        171. // nothing was read. release the buffer.
        172. byteBuf.release();// 引用 -1
        173. byteBuf = null;
        174. close = allocHandle.lastBytesRead() < 0;// 如果远程已经关闭连接
        175. if (close) {
        176. // There is nothing left to read as we received an EOF.
        177. readPending = false;
        178. }
        179. break;
        180. }
        181. allocHandle.incMessagesRead(1);// totalMessages += amt;
        182. readPending = false;
        183. //
        184. pipeline.fireChannelRead(byteBuf);
        185. byteBuf = null;
        186. } while (allocHandle.continueReading());
        187. allocHandle.readComplete();
        188. pipeline.fireChannelReadComplete();
        189. if (close) {
        190. closeOnRead(pipeline);
        191. }
        192. } catch (Throwable t) {
        193. handleReadException(pipeline, byteBuf, t, close, allocHandle);
        194. } finally {
        195. if (!readPending && !config.isAutoRead()) {
        196. removeReadOp();
        197. }
        198. }

        5.3、boss线程池总结

        boss线程池创建打开select监听, NioEventLoop继承多线程ScheduledExecutorService,在run方法实现了获取nio selectKey状态的方法processSelectedKeys.后续调read()封装ByteBuf在保存在堆外内存.



        六、worker线程池调用链


        6.1、worker线程初始化


        将线程池注册到Channel通道上面,然后通过Channel通道获取线程池执行, TODO

        1. //ServerBootstrapAcceptor.java
        2. //childGroup 线程池监听通道
        3. public void channelRead(ChannelHandlerContext ctx, Object msg) {
        4. //channel对象构造
        5. final Channel child = (Channel) msg;
        6. child.pipeline().addLast(childHandler);
        7. setChannelOptions(child, childOptions, logger);
        8. setAttributes(child, childAttrs);
        9. try {
        10. //将child通道对象,分配给childGroup处理并监听结果
        11. childGroup.register(child).addListener(new ChannelFutureListener() {
        12. @Override
        13. public void operationComplete(ChannelFuture future) throws Exception {
        14. if (!future.isSuccess()) {
        15. forceClose(child, future.cause());
        16. }
        17. }
        18. });
        19. } catch (Throwable t) {
        20. forceClose(child, t);
        21. }
        22. }
        23. //Channel对象获取worke线程池
        24. //AbstractChannelHandlerContext.java
        25. public EventExecutor executor() {
        26. if (executor == null) {
        27. return channel().eventLoop();
        28. } else {
        29. return executor;
        30. }
        31. }


         

      99. 相关阅读:
        Golang一日一库之bcrypt
        【 与百度搜索相同的bootstrap4与5自动补全功能(autocomplete)】
        【论文阅读】Q8BERT: Quantized 8Bit BERT
        【扩散模型】5、Diffusion models beat GAN | 使用类别引导图像生成
        Android frameworks学习
        Linux 日志系统、auditd用户审计、kdump故障定位
        2022英特尔® FPGA中国技术周
        U盘里文件损坏无法打开怎么恢复?
        ITextRenderer将PDF转换为HTML详细教程
        LeetCode 452. 用最少数量的箭引爆气球
      100. 原文地址:https://blog.csdn.net/god8816/article/details/127774067