• netty源码看不懂?试着写一个吧


    前言

    最近一直在看netty源码,观后感:很难看,于是为了屡清netty的设计思路,我参照netty源码手写一个山寨简版的“netty”,说是手写,其实也就是从源码复制出来核心的代码,并尽量保持命名,设计结构与源码基本一致,因为我的目的很明确:尝试以作者的角度理解netty的全貌

    效果

    最终山寨版的netty代码server端使用如下(代码没有引用任何netty的依赖)

    1. public static void main(String[] args) {
    2. EventLoopGroup bossGroup = new NioEventLoopGroup(1);
    3. EventLoopGroup workerGroup = new NioEventLoopGroup(4);
    4. try {
    5. ServerBootstrap bootstrap = new ServerBootstrap();
    6. bootstrap.group(bossGroup, workerGroup)
    7. .channel(NioServerSocketChannel.class)
    8. .childHandler(new NettyServerHandler(), new NettyServerHandler2());
    9. System.out.println("netty server start...");
    10. bootstrap.bind(9000);
    11. } finally {
    12. }
    13. }
    14. 复制代码

    其中NettyHandler:

    1. public class NettyServerHandler implements ChannelInboundHandler {
    2. @Override
    3. public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
    4. }
    5. @Override
    6. public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
    7. System.out.println("thread: " + Thread.currentThread().getName());
    8. System.out.println("msg:" + new String(((ByteBuffer) msg).array()));
    9. // 传递给下个handler
    10. ctx.fireChannelRead(msg);
    11. }
    12. }
    13. 复制代码

    看起来和真的netty其实差不多,最终执行的效果也比较符合预期

    Netty与NIO

    首先,在学习netty前,要了解它是干嘛的,从我们常使用的场景上看可以说NETTY其实就是对NIO编程的一种封装,所以在理解netty之前,nio的基础知识是必须要掌握的,下面是一个NIO编程的基础server端代码:

    1. public class NioServer {
    2. public static void main(String[] args) throws IOException, InterruptedException {
    3. // 创建NIO ServerSocketChannel
    4. ServerSocketChannel serverSocket = ServerSocketChannel.open();
    5. serverSocket.socket().bind(new InetSocketAddress(9000));
    6. serverSocket.configureBlocking(false);
    7. // 打开Selector处理Channel,底层调用epoll_create
    8. Selector selector = Selector.open();
    9. // 把ServerSocketChannel注册到selector上,并且selector对客户端accept连接操作感兴趣,底层调用epoll_ctl
    10. // SelectionKey registerKey = serverSocket.register(selector, SelectionKey.OP_ACCEPT);
    11. SelectionKey serverRegisterKey = serverSocket.register(selector, 0);
    12. serverRegisterKey.interestOps(SelectionKey.OP_ACCEPT);
    13. // 测试attach
    14. serverRegisterKey.attach(new NioServer());
    15. System.out.println("服务启动成功");
    16. while (true) {
    17. // 阻塞等待需要处理的事件发生,即调用epoll_wait
    18. selector.select();
    19. // 获取selector中注册的全部事件的 SelectionKey 实例
    20. Set<SelectionKey> selectionKeys = selector.selectedKeys();
    21. Iterator<SelectionKey> iterator = selectionKeys.iterator();
    22. // 遍历SelectionKey对事件进行处理
    23. while (iterator.hasNext()) {
    24. SelectionKey key = iterator.next();
    25. // 测试attachment
    26. System.out.println("attachment: "+ key.attachment());
    27. // 测试key
    28. System.out.println("key is serverRegisterKey:" + key.equals(serverRegisterKey));
    29. // 如果是OP_ACCEPT事件,则进行连接获取和事件注册
    30. if (key.isAcceptable()) {
    31. ServerSocketChannel server = (ServerSocketChannel) key.channel();
    32. SocketChannel socketChannel = server.accept();
    33. socketChannel.configureBlocking(false);
    34. // 这里只注册了读事件,如果需要给客户端发送数据可以注册写事件,底层调用epoll_ctl
    35. socketChannel.register(selector, SelectionKey.OP_READ);
    36. System.out.println("客户端连接成功");
    37. } else if (key.isReadable()) { // 如果是OP_READ事件,则进行读取和打印
    38. SocketChannel socketChannel = (SocketChannel) key.channel();
    39. ByteBuffer byteBuffer = ByteBuffer.allocate(128);
    40. int len = socketChannel.read(byteBuffer);
    41. // 如果有数据,把数据打印出来
    42. if (len > 0) {
    43. System.out.println("接收到消息:" + new String(byteBuffer.array()));
    44. } else if (len == -1) { // 如果客户端断开连接,关闭Socket
    45. System.out.println("客户端断开连接");
    46. socketChannel.close();
    47. }
    48. }
    49. //从事件集合里删除本次处理的key,防止下次select重复处理
    50. iterator.remove();
    51. }
    52. }
    53. }
    54. }
    55. 复制代码

    如果这里看不懂,可以参考Netty-理解selector是什么

    上面的服务端代码有几个问题:

    • 每次写基本都是大致固定的写法,很多代码可以封装
    • 所有的请求都交给同一个线程处理,强求量一大就会出现问题
    • 写起来非常蹩脚

    而netty的存在也就是为了解决这些问题,通过封装让NIO的编程变的简单易用

    所以下面我们就假装以作者的设计角度,尝试着写一个"山寨"的netty

    总体思路

    首先我们要把每次都会写的固定代码封装起来,比如服务端接受OP_ACCEPT事件,之后需要把accpet到的channel注册到多路复用器,这些都是基本固定代码,可以封装,而接受到客户端信息之后的处理方式是根据需求而定的,所以要暴露出来可以让用户自定义

    其次就是最重要的线程模型,肯定不能所有的请求都同一个线程去处理,最次也得用个线程池

    Channel的封装

    回头看NIO的代码,出现了两种jdk的channel:ServerSocketChannelSocketChannel,二者都继承了SelectableChannel,二者都有一些通用的行为,我们可以给它抽象出来一个channel类,里面封装了jdk的channel和感兴趣的事件,并提供了注册到多路复用器、绑定端口、设置非阻塞等行为,并可以继续泛化为服务端通道和客户端通道

    Channel下的管道

    回头看NIO的代码,每个channel发生事件后都会进行一些处理,所以这些处理的方式可以说是属于某个channel的,而一个channel下可能会有个步骤的处理工作,比如先解码再实际处理业务,我们可以把channel下每个处理步骤抽象成一个handler,多个handler互相连接组成channel下的一个管道(pipeline),这样channel的机构如下

    用户可以通过给管道下的pipeline添加处理步骤来改变时间放生时的响应,以实现自定义的客户端通道发来信息的响应

    而服务端的管道处理步骤基本是一定的,即读到channel注册到多路复用器,所以对应的handler我们可以封装一个在服务端通道建立时就绑定上

    EventLoop

    通道封装完了,通道发生事件后的处理步骤也可以用户自定义,下一步研究就是这些活到底谁来干,也就是方法写好了,下一步是用什么线程模型去执行这些方法能做到高效,所以现在需要的是一个执行者(Executor)

    我们想想实际场景,一个服务端可能会受到多个客户端的连接请求,我们要做的是高效的让请求多时可以用多个线程去处理,同时希望线程数可以根据场景来配置,针对这种情况,大师Doug Lea给出了以一种解决思路,写在《Scalable IO in Java》中,有兴趣可以自行百度,最终Lea给出的模型大概如下

    主要思路是有一个主线程可以通过selector监听服务端通道,在发生连接事件后,把连接的通道注册到子线程池中某一个线程下的selector中,这个子线程就负责使用selector监听客户端通道,发生事件后处理

    所以我们要有这样一种线程:

    • 内部有多路复用器selector,可以注册多个channel,并在事件放生是执行channel下绑定的处理步骤
    • 除了监控selector,也可以以执行给定任务

    这种线程可以处理事件(Event),同时它一经启动就不会自动关闭,因为要监控selector,所以内部一定是个死循环Loop,所以这种执行特定任务的线程就叫做EventLoop,说的更直白点,就是一个会不断响应多路复用器事件的多channel处理者

    而channel也可以注册到EventLoop,这样channel发生的事件就会由EventLoop按管道内部定义的handler执行处理

    EventLoop&EventLoopGroup

    EventLoop是一个特定的线程,那么EventLoopGroup就是这种特定线程的组合,也就是特定线程池,内部包含固定数量的EventLoop,而其对外提供的服务和内部的EventLoop组一模一样,只不过选了某一个EventLoop去具体执行服务,所以二者的方法是一样的,在netty源码中,EventLoop继承了EventLoopGroup,看到这里不免很糊涂,确实很怪,但可以这样去理解:EventLoopGroup是一群一模一样的EventLoop的组合,所以EventLoop能干什么,EventLoopGroup也就能且只能干什么,这样EventLoop就可以看成一个特殊的EventLoopGroup,只不过是只有一个对象的EventLoopGroup

    比如:有一个瓦匠群体,瓦匠群体的每个人都只会砌墙,那么群体对外能提供的服务即砌墙,只不过是选一个瓦匠去砌墙,而某个瓦匠对外提供的服务也是砌墙,那么他可以看做一个特殊的瓦匠群体

    uml

    结合以上的设计思路,再尽量贴近netty的源码,最终画出的uml图如下

    与上面的思路相比,更细致了多层的抽象类,尽量单一职责,主要是为了和netty基本结构一致

    相比于netty,主要省略了EventExecutor和EventExecutorGroup,把对应的代码写在了EventLoop和EventLoopGroup中,省略了各种Unsafe内部类,把Unsafe的方法直接写在外部类

    每个类的具体意义和实现代码中介绍

    Channel管道

    Channel

    首先是抽象的channel接口,主要方法即绑定EventLoop,绑定端口和获取pipeline

    1. public interface Channel {
    2. /**
    3. * 绑定事件持续处理器
    4. * @param eventLoop
    5. */
    6. void register(EventLoop eventLoop);
    7. /**
    8. * 获取事件持续处理器
    9. * @return
    10. */
    11. EventLoop eventLoop();
    12. /**
    13. * 通道内部的管道
    14. * @return
    15. */
    16. ChannelPipeline pipeline();
    17. /**
    18. * 绑定端口
    19. * @param localAddress
    20. */
    21. void bind(SocketAddress localAddress);
    22. /**
    23. * 开始读取,nio的实现即注册感兴趣的事件
    24. */
    25. void beginRead();
    26. }
    27. 复制代码

    ChannelHandler

    即管道绑定的处理器,一个管道对应一个pipeline,一个pipeline用链表形式存储多个处理器,这个我简化了一下,只是空接口,他有两种子类ChannelInboundHandler和ChannelOutboundHandler,分别管道进入即读事件的处理器和管道流出即写事件的处理器

    1. public interface ChannelHandler {
    2. }
    3. 复制代码

    ChannelInboundHandler

    管道进入事件处理器,与之对应管道返回事件处理器,我并没有写

    1. public interface ChannelInboundHandler extends ChannelHandler{
    2. void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception;
    3. void channelReadComplete(ChannelHandlerContext ctx) throws Exception;
    4. }
    5. 复制代码

    ChannelHandlerContext

    由于用户再写处理器时需要得知上下文信息,即channel是哪个(可以通过channel写回数据),eventLoop是哪个,所以需要用一个上下文对象把handler包装起来,同时Context对象还保存了链表的关系,使得handler形成链表

    1. public class ChannelHandlerContext {
    2. private final ChannelHandler handler;
    3. /**
    4. * 链表
    5. */
    6. volatile ChannelHandlerContext next;
    7. volatile ChannelHandlerContext prev;
    8. private final ChannelPipeline pipeline;
    9. public ChannelHandlerContext(ChannelPipeline pipeline, ChannelHandler handler) {
    10. this.pipeline = pipeline;
    11. this.handler = handler;
    12. }
    13. /**
    14. * 当前通道
    15. * @return
    16. */
    17. public Channel channel() {
    18. return pipeline.channel();
    19. }
    20. /**
    21. * 当前管道
    22. * @return
    23. */
    24. public ChannelPipeline pipeline() {
    25. return pipeline;
    26. }
    27. /**
    28. * 当前执行器
    29. * @return
    30. */
    31. public EventLoop executor() {
    32. return channel().eventLoop();
    33. }
    34. public ChannelHandler handler() {
    35. return handler;
    36. }
    37. /**
    38. * 把信息传给链表下一个read节点去处理
    39. * @param msg
    40. * @return
    41. */
    42. public ChannelHandlerContext fireChannelRead(final Object msg) {
    43. findContextInbound().invokeChannelRead(msg);
    44. return this;
    45. }
    46. /**
    47. * 找到自己后面的Inbound处理器
    48. * @return
    49. */
    50. private ChannelHandlerContext findContextInbound() {
    51. ChannelHandlerContext ctx = this;
    52. do {
    53. ctx = ctx.next;
    54. } while (!(ctx.handler() instanceof ChannelInboundHandler));
    55. return ctx;
    56. }
    57. /**
    58. * 调用handler的channelRead方法
    59. * @param msg
    60. */
    61. private void invokeChannelRead(Object msg) {
    62. try {
    63. ((ChannelInboundHandler) handler()).channelRead(this, msg);
    64. } catch (Throwable t) {
    65. // 如果当前的handler不是ChannelInboundHandler则报错
    66. }
    67. }
    68. }
    69. 复制代码

    ChannelPipeline

    保存链表的收尾,因为链表是双向链表(in和out反向),同时支持添加新处理器至链表

    1. public class ChannelPipeline {
    2. /**
    3. * 管道的第一个处理器,管道的处理器是链式结构
    4. */
    5. final ChannelHandlerContext head;
    6. final ChannelHandlerContext tail;
    7. /**
    8. * 所在的通道
    9. */
    10. private final Channel channel;
    11. public ChannelPipeline(Channel channel) {
    12. this.channel = channel;
    13. head = new HeadContext(this);
    14. tail = new TailContext(this);
    15. // 头尾互指
    16. head.next = tail;
    17. tail.prev = head;
    18. }
    19. public final Channel channel() {
    20. return channel;
    21. }
    22. /**
    23. * 添加处理器
    24. * @param handler
    25. * @return
    26. */
    27. public final ChannelPipeline addLast(ChannelHandler handler) {
    28. // 把handler包装成上下文
    29. ChannelHandlerContext newCtx = new ChannelHandlerContext(this, handler);
    30. addLast0(newCtx);
    31. return this;
    32. }
    33. /**
    34. * 在链表结尾添加新的节点
    35. * @param newCtx
    36. */
    37. private void addLast0(ChannelHandlerContext newCtx) {
    38. ChannelHandlerContext prev = tail.prev;
    39. newCtx.prev = prev;
    40. newCtx.next = tail;
    41. prev.next = newCtx;
    42. tail.prev = newCtx;
    43. }
    44. /**
    45. * 结尾,简化处理
    46. */
    47. final class TailContext extends ChannelHandlerContext {
    48. public TailContext(ChannelPipeline pipeline) {
    49. super(pipeline, null);
    50. }
    51. }
    52. /**
    53. * 头部简化处理
    54. */
    55. final class HeadContext extends ChannelHandlerContext {
    56. public HeadContext(ChannelPipeline pipeline) {
    57. super(pipeline, null);
    58. }
    59. }
    60. /**
    61. * 开始处理read操作
    62. * @param msg
    63. * @return
    64. */
    65. public final ChannelPipeline fireChannelRead(Object msg) {
    66. head.fireChannelRead(msg);
    67. return this;
    68. }
    69. public final ChannelPipeline fireChannelReadComplete() {
    70. // 省略不写了,和fireChannelRead差不多道理
    71. return this;
    72. }
    73. }
    74. 复制代码

    AbstractChannel

    AbstractChannel 抽象的channel实现,主要实现了绑定EventLoop对象和初始化ChannelPipeline,也就是构造出Channel数据结构

    1. public abstract class AbstractChannel implements Channel {
    2. /**
    3. * 父通道
    4. */
    5. private final Channel parent;
    6. /**
    7. * 绑定的事件循环器
    8. */
    9. private volatile EventLoop eventLoop;
    10. /**
    11. * 管道
    12. */
    13. private final ChannelPipeline pipeline;
    14. public AbstractChannel(Channel parent) {
    15. this.parent = parent;
    16. pipeline = newChannelPipeline();
    17. }
    18. protected ChannelPipeline newChannelPipeline() {
    19. return new ChannelPipeline(this);
    20. }
    21. /**
    22. * 返回绑定的事件处理器
    23. *
    24. * @return
    25. */
    26. @Override
    27. public EventLoop eventLoop() {
    28. return eventLoop;
    29. }
    30. @Override
    31. public ChannelPipeline pipeline() {
    32. return pipeline;
    33. }
    34. @Override
    35. public void beginRead() {
    36. doBeginRead();
    37. }
    38. protected abstract void doBeginRead();
    39. @Override
    40. public void bind(SocketAddress localAddress) {
    41. try {
    42. doBind(localAddress);
    43. } catch (Exception e) {
    44. e.printStackTrace();
    45. }
    46. }
    47. protected abstract void doBind(SocketAddress localAddress) throws Exception;
    48. protected void doRegister() throws Exception {
    49. // NOOP
    50. }
    51. /**
    52. * Channel绑定eventLoop
    53. *
    54. * @param eventLoop
    55. */
    56. @Override
    57. public final void register(EventLoop eventLoop) {
    58. // 省去乱七八遭的判断,源码实在内部类Unsafe下,所以是:AbstractChannel.this
    59. AbstractChannel.this.eventLoop = eventLoop;
    60. eventLoop.execute(() -> {
    61. register0();
    62. });
    63. }
    64. /**
    65. * 实际注册
    66. */
    67. private void register0() {
    68. try {
    69. doRegister();
    70. // 开始读取感兴趣事件
    71. beginRead();
    72. } catch (Exception e) {
    73. e.printStackTrace();
    74. }
    75. }
    76. }
    77. 复制代码

    通过new ChannelPipeline(this),初始化了一个channel下的空管道,register方法只是存储EventLoop对象,而实际的doRegister抽象处理,因为不同的channel注册方式不一样,NIO是注册到多路复用器,其它的注册方式并非如此

    AbstractNioChannel

    这个就是专门处理NIO的抽象channel了,就可以实际去实现NIO的注册了

    1. public abstract class AbstractNioChannel extends AbstractChannel {
    2. /**
    3. * java的channel 包括ServerSocketChannel和SocketChannel
    4. */
    5. private final SelectableChannel ch;
    6. private SelectionKey selectionKey;
    7. /**
    8. * 感兴趣的事件
    9. */
    10. protected final int readInterestOp;
    11. public AbstractNioChannel(Channel parent, SelectableChannel ch, int readInterestOp) {
    12. super(parent);
    13. this.ch = ch;
    14. this.readInterestOp = readInterestOp;
    15. try {
    16. ch.configureBlocking(false);
    17. } catch (IOException e) {
    18. e.printStackTrace();
    19. }
    20. }
    21. /**
    22. * 返回java的channel
    23. *
    24. * @return
    25. */
    26. protected SelectableChannel javaChannel() {
    27. return ch;
    28. }
    29. /**
    30. * 把通道注册到多路复用器
    31. * @throws Exception
    32. */
    33. @Override
    34. protected void doRegister() throws Exception {
    35. // 最后一个字段this,相当于selectionKey.attach(this),后续可以通过attachment()方法取到
    36. // 由于多个channel注册到一个eventLoop,所以需要传递当前的channel以便eventLoop获取到事件时可以知道是哪个channel产生的事件
    37. selectionKey = javaChannel().register(((NioEventLoop) eventLoop()).unwrappedSelector(), 0, this);
    38. }
    39. /**
    40. * 注册感兴趣事件
    41. */
    42. @Override
    43. protected void doBeginRead() {
    44. selectionKey.interestOps(readInterestOp);
    45. }
    46. /**
    47. * 从 {@link SelectableChannel} 读取事件,源码是写在Unsafe里
    48. */
    49. public abstract void read();
    50. }
    51. 复制代码

    存储了SelectableChannel,即ServerSocketChannel和SocketChannel的共同父类

    实现了doRegister方法,即把jdk的channel注册到EventLoop下面的多路复用器

    实现了doBeginRead方法,即注册感兴趣事件

    抽象了一个read方法,即从channel读取信息,由于客户端与服务端读取方法不一样,所以抽象出来

    而客户端与服务端的read实现也是分别抽象了两个类来提交给channel的handler,即AbstractNioMessageChannel和AbstractNioByteChannel

    AbstractNioMessageChannel

    主要封装了通过抽象doReadMessages读取事件信息后传递给channel的管道

    1. public abstract class AbstractNioMessageChannel extends AbstractNioChannel {
    2. /**
    3. * 读取到的缓存
    4. */
    5. private final List<Object> readBuf = new ArrayList<Object>();
    6. public AbstractNioMessageChannel(Channel parent, SelectableChannel ch, int readInterestOp) {
    7. super(parent, ch, readInterestOp);
    8. }
    9. /**
    10. * 从SelectableChannel中读取信息
    11. */
    12. @Override
    13. public void read() {
    14. final ChannelPipeline pipeline = pipeline();
    15. // 实际读取信息,由子类实现
    16. doReadMessages(readBuf);
    17. int size = readBuf.size();
    18. for (int i = 0; i < size; i ++) {
    19. // 调用管道的read处理器
    20. pipeline.fireChannelRead(readBuf.get(i));
    21. }
    22. readBuf.clear();
    23. }
    24. protected abstract int doReadMessages(List<Object> buf);
    25. }
    26. 复制代码

    其中doReadMessages实际读取信息,看一下子类NioServerSocketChannel如何实现

    NioServerSocketChannel

    NIO服务端通道,实现了doReadMessages,即读取客户端channel

    1. public class NioServerSocketChannel extends AbstractNioMessageChannel {
    2. /**
    3. * 多路复用器提供者
    4. */
    5. private static final SelectorProvider DEFAULT_SELECTOR_PROVIDER = SelectorProvider.provider();
    6. /**
    7. * 开启一个 java ServerSocketChannel
    8. * @param provider
    9. * @return
    10. */
    11. private static ServerSocketChannel newSocket(SelectorProvider provider) {
    12. try {
    13. return provider.openServerSocketChannel();
    14. } catch (IOException e) {
    15. return null;
    16. }
    17. }
    18. public NioServerSocketChannel() {
    19. this(newSocket(DEFAULT_SELECTOR_PROVIDER));
    20. }
    21. public NioServerSocketChannel(ServerSocketChannel channel) {
    22. super(null, channel, SelectionKey.OP_ACCEPT);
    23. }
    24. /**
    25. * 覆盖,因为可以确定返回的是ServerSocketChannel
    26. * @return
    27. */
    28. @Override
    29. protected ServerSocketChannel javaChannel() {
    30. return (ServerSocketChannel) super.javaChannel();
    31. }
    32. /**
    33. * 绑定端socket
    34. * @param localAddress
    35. * @throws Exception
    36. */
    37. @Override
    38. protected void doBind(SocketAddress localAddress) throws Exception {
    39. javaChannel().socket().bind(localAddress);
    40. }
    41. /**
    42. * 读取信息,作为SeverSocketChannel(服务端通道),读取信息即accept后的SocketChannel(客户端通道)
    43. * @param buf
    44. * @return
    45. */
    46. @Override
    47. protected int doReadMessages(List<Object> buf) {
    48. SocketChannel ch = null;
    49. try {
    50. ch = javaChannel().accept();
    51. } catch (IOException e) {
    52. }
    53. if (ch != null) {
    54. buf.add(new NioSocketChannel(this, ch));
    55. return 1;
    56. }
    57. return 0;
    58. }
    59. }
    60. 复制代码

    同时封装了ServerSocketChannel的创建,并和感兴趣SelectionKey.OP_ACCEPT的事件传递给父类,实现了绑定端口javaChannel().socket().bind(localAddress)

    AbstractNioByteChannel

    与AbstractNioMessageChannel对应,是读取byte也就是字节并传递给channel的管道,实际的读取还是抽象的doReadBytes

    1. public abstract class AbstractNioByteChannel extends AbstractNioChannel {
    2. public AbstractNioByteChannel(Channel parent, SelectableChannel ch, int readInterestOp) {
    3. super(parent, ch, readInterestOp);
    4. }
    5. @Override
    6. public void read() {
    7. final ChannelPipeline pipeline = pipeline();
    8. // 这里做了简化处理,源码用的自封装ByteBuf
    9. ByteBuffer byteBuf = ByteBuffer.allocate(128);
    10. doReadBytes(byteBuf);
    11. pipeline.fireChannelRead(byteBuf);
    12. }
    13. protected abstract int doReadBytes(ByteBuffer buf);
    14. }
    15. 复制代码

    源码使用自己封装的ByteBuf,这里简化了,看一下它的子类即NioSocketChannel

    NioSocketChannel

    客户端的channel

    1. public class NioSocketChannel extends AbstractNioByteChannel {
    2. /**
    3. * 多路复用器提供者
    4. */
    5. private static final SelectorProvider DEFAULT_SELECTOR_PROVIDER = SelectorProvider.provider();
    6. /**
    7. * 开启一个 java SocketChannel 这个方法为客户端创建通道使用
    8. *
    9. * @param provider
    10. * @return
    11. */
    12. private static SocketChannel newSocket(SelectorProvider provider) {
    13. try {
    14. return provider.openSocketChannel();
    15. } catch (IOException e) {
    16. return null;
    17. }
    18. }
    19. public NioSocketChannel() {
    20. this(null, newSocket(DEFAULT_SELECTOR_PROVIDER));
    21. }
    22. /**
    23. * SocketChannel 感兴趣的事件是READ
    24. *
    25. * @param parent
    26. * @param channel
    27. */
    28. public NioSocketChannel(Channel parent, SocketChannel channel) {
    29. super(parent, channel, SelectionKey.OP_READ);
    30. }
    31. /**
    32. * 覆盖,因为可以确定返回的是SocketChannel
    33. *
    34. * @return
    35. */
    36. @Override
    37. protected SocketChannel javaChannel() {
    38. return (SocketChannel) super.javaChannel();
    39. }
    40. /**
    41. * 绑定端socket
    42. *
    43. * @param localAddress
    44. * @throws Exception
    45. */
    46. @Override
    47. protected void doBind(SocketAddress localAddress) throws Exception {
    48. javaChannel().socket().bind(localAddress);
    49. }
    50. @Override
    51. protected int doReadBytes(ByteBuffer buf) {
    52. try {
    53. return javaChannel().read(buf);
    54. } catch (IOException e) {
    55. throw new RuntimeException();
    56. }
    57. }
    58. }
    59. 复制代码

    这个和Server端差不多,只不过一个读channel,一个读字节。感兴趣的事件是SelectionKey.OP_READ

    到此channel相关类写完~

    EventLoop事件循环器

    EventLoopGroup

    事件循环器组,和EventLoop提供一样的功能,同时可以选择下一个EventLoop且可以迭代

    1. public interface EventLoopGroup extends Executor, Iterable<EventLoop>{
    2. void register(Channel channel);
    3. EventLoop next();
    4. @Override
    5. Iterator iterator();
    6. }
    7. 复制代码

    EventLoop

    继承EventLoopGroup,并且可以查到父Group

    1. public interface EventLoop extends EventLoopGroup {
    2. EventLoopGroup parent();
    3. }
    4. 复制代码

    SingleThreadEventExecutor

    本来打算把EventExecutor去掉,代码写在EventLoop,但这个类太重要了,所以保留了下来

    1. public abstract class SingleThreadEventExecutor implements Executor {
    2. /**
    3. * 默认任务列表长度
    4. */
    5. protected static final int DEFAULT_MAX_PENDING_TASKS = 16;
    6. /**
    7. * 待完成的任务
    8. */
    9. private final Queue<Runnable> taskQueue;
    10. /**
    11. * 实际工作者
    12. */
    13. private final Executor executor;
    14. /**
    15. * 当前运行线程
    16. */
    17. private volatile Thread thread;
    18. /**
    19. * ST_NOT_STARTED: 未启动, ST_STARTED 已启动
    20. */
    21. private static final int ST_NOT_STARTED = 1;
    22. private static final int ST_STARTED = 2;
    23. /**
    24. * 标记是否启动
    25. */
    26. private volatile int state = ST_NOT_STARTED;
    27. /**
    28. * 原子启动标记更新器
    29. */
    30. private static final AtomicIntegerFieldUpdater<SingleThreadEventExecutor> STATE_UPDATER =
    31. AtomicIntegerFieldUpdater.newUpdater(SingleThreadEventExecutor.class, "state");
    32. public SingleThreadEventExecutor(Executor executor) {
    33. taskQueue = newTaskQueue(DEFAULT_MAX_PENDING_TASKS);
    34. this.executor = executor;
    35. }
    36. /**
    37. * 初始化一个新的任务对列
    38. * @param maxPendingTasks
    39. * @return
    40. */
    41. protected Queue<Runnable> newTaskQueue(int maxPendingTasks) {
    42. return new LinkedBlockingQueue<>(maxPendingTasks);
    43. }
    44. /**
    45. * 添加任务
    46. * @param task
    47. */
    48. protected void addTask(Runnable task) {
    49. if (task == null) {
    50. throw new NullPointerException("task");
    51. }
    52. if (!taskQueue.offer(task)) {
    53. throw new RejectedExecutionException("event executor terminated");
    54. }
    55. }
    56. /**
    57. * 检查是否有任务
    58. *
    59. * @return
    60. */
    61. protected boolean hasTasks() {
    62. return !taskQueue.isEmpty();
    63. }
    64. /**
    65. * 运行所有任务
    66. * @return
    67. */
    68. protected boolean runAllTasks() {
    69. // 省略乱七八糟的判断,把多个子方法简化
    70. Runnable task = taskQueue.poll();
    71. if (task == null) {
    72. return false;
    73. }
    74. for (;;) {
    75. task.run();
    76. task = taskQueue.poll();
    77. if (task == null) {
    78. return true;
    79. }
    80. }
    81. }
    82. @Override
    83. public void execute(Runnable task) {
    84. addTask(task);
    85. startThread();
    86. }
    87. /**
    88. * 开启线程执行(判断已启动过不再启动)
    89. */
    90. private void startThread() {
    91. // 未启动才能启动,也就是只启动一次
    92. if (state == ST_NOT_STARTED) {
    93. // 再次CAS判断避免线程不安全
    94. if (STATE_UPDATER.compareAndSet(this, ST_NOT_STARTED, ST_STARTED)) {
    95. doStartThread();
    96. }
    97. }
    98. }
    99. /**
    100. * 实际开启线程执行run方法
    101. */
    102. private void doStartThread() {
    103. // 使用真实的执行者执行任务
    104. executor.execute(() -> {
    105. // 保存执行的 线程
    106. thread = Thread.currentThread();
    107. // 省去乱起八遭的判断
    108. SingleThreadEventExecutor.this.run();
    109. // 如果执行结束,则报错
    110. System.out.println("Buggy EventExecutor implementation; SingleThreadEventExecutor.confirmShutdown() must be called before run() implementation terminates");
    111. });
    112. }
    113. /**
    114. * 抽象run方法,是一个不能运行结束的方法(除非手动关闭),即loop
    115. */
    116. protected abstract void run();
    117. }
    118. 复制代码

    可以这么概括它:SingleThreadEventExecutor是一种特殊的任务执行器,第一次收到任务它就会启动(开启一个线程运行run方法,run方法是一个会一直执行的方法,否则报错),当任务来时并不是立即执行,它们会加入到自己的任务对列中,并且按自己的套路(run)去执行

    SingleThreadEventLoop

    单线程事件循环器

    1. public abstract class SingleThreadEventLoop extends SingleThreadEventExecutor implements EventLoop {
    2. private final Collection<EventLoop> selfCollection = Collections.<EventLoop>singleton(this);
    3. private final EventLoopGroup parent;
    4. public SingleThreadEventLoop(EventLoopGroup parent, Executor executor) {
    5. super(executor);
    6. this.parent = parent;
    7. }
    8. @Override
    9. public EventLoopGroup parent() {
    10. return parent;
    11. }
    12. @Override
    13. public void register(Channel channel) {
    14. // netty源码 promise.channel().unsafe().register(this, promise); 简化不区分unsafe,如下
    15. channel.register(this);
    16. }
    17. @Override
    18. public EventLoop next() {
    19. return this;
    20. }
    21. @Override
    22. public Iterator<EventLoop> iterator() {
    23. return selfCollection.iterator();
    24. }
    25. }
    26. 复制代码

    这个类其实就是实现了parent(),和next() ,iterator()这种应对Group接口的方法,之所以敢称单线程,是因为他爹是单线程处理器SingleThreadEventExecutor,它的存在就是为了让子类不用再处理与Group的关系

    NioEventLoop

    重点来了,Nio版的事件事件循环器

    1. public class NioEventLoop extends SingleThreadEventLoop {
    2. private Selector selector;
    3. private final SelectorProvider provider;
    4. public NioEventLoop(NioEventLoopGroup parent, SelectorProvider selectorProvider, Executor executor) {
    5. super(parent, executor);
    6. this.provider = selectorProvider;
    7. this.selector = openSelector();
    8. }
    9. public SelectorProvider selectorProvider() {
    10. return provider;
    11. }
    12. private Selector openSelector() {
    13. try {
    14. return selectorProvider().openSelector();
    15. } catch (IOException e) {
    16. e.printStackTrace();
    17. return null;
    18. }
    19. }
    20. /**
    21. * 由于没做装饰,所以selector即unwrappedSelector
    22. *
    23. * @return
    24. */
    25. public Selector unwrappedSelector() {
    26. return selector;
    27. }
    28. /**
    29. * 运行
    30. */
    31. @Override
    32. protected void run() {
    33. for (;;) {
    34. try {
    35. select();
    36. } catch (IOException e) {
    37. e.printStackTrace();
    38. }
    39. try {
    40. processSelectedKeys();
    41. } finally {
    42. runAllTasks();
    43. }
    44. }
    45. }
    46. private void select() throws IOException {
    47. // 拿到多路复用器
    48. Selector selector = this.selector;
    49. for (;;) {
    50. // 等待,简化固定1
    51. int selectedKeys = selector.select(1000);
    52. // 如果有事件发生或当前有任务跳出循环
    53. if (selectedKeys != 0 || hasTasks()) {
    54. break;
    55. }
    56. }
    57. }
    58. private void processSelectedKeys() {
    59. processSelectedKeysPlain(selector.selectedKeys());
    60. }
    61. private void processSelectedKeysPlain(Set<SelectionKey> selectedKeys) {
    62. if (selectedKeys.isEmpty()) {
    63. return;
    64. }
    65. Iterator<SelectionKey> i = selectedKeys.iterator();
    66. for (;;) {
    67. final SelectionKey k = i.next();
    68. final Object a = k.attachment();
    69. i.remove();
    70. // 获取注册时绑定的参数
    71. if (a instanceof AbstractNioChannel) {
    72. processSelectedKey(k, (AbstractNioChannel) a);
    73. } else {
    74. // 由于手写简版只attach了AbstractNioChannel所以不会出现,但源码有其它的attach
    75. }
    76. if (!i.hasNext()) {
    77. break;
    78. }
    79. }
    80. }
    81. /**
    82. * 处理单个事件
    83. * @param k
    84. * @param ch
    85. */
    86. private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {
    87. // 这个try是为了和源码尽量长得像,简版不处理异常
    88. try {
    89. // 获取发生的事件标识
    90. int readyOps = k.readyOps();
    91. // 如果是read事件 或 accpet事件
    92. if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
    93. // channel读取
    94. ch.read();
    95. }
    96. } catch (Exception e) {
    97. }
    98. }
    99. }
    100. 复制代码

    内部携带了一个多路复用器,作为一个SingleThreadEventExecutor,它的运行套路是不断的监听selector,如果任务队列有任务,就处理任务,这里我简化了代码,监听1秒再去查看是否有任务,没有再回来监听,源码是有个策略判断该执行任务还是该阻塞到selector上

    MultithreadEventLoopGroup

    它存在的意义和SingleThreadEventLoop差不多,帮助子类处理group和成员的关系,实现了next方法(这里我简化了用轮训,源码可以自定义chooser)

    1. public abstract class MultithreadEventLoopGroup implements EventLoopGroup {
    2. private final EventLoop[] children;
    3. /**
    4. * 为了迭代用
    5. */
    6. private final Set<EventLoop> readonlyChildren;
    7. public MultithreadEventLoopGroup(int nThreads, Executor executor) {
    8. if (nThreads <= 0) {
    9. throw new IllegalArgumentException();
    10. }
    11. if (executor == null) {
    12. executor = new ThreadPerTaskExecutor(new DefaultThreadFactory());
    13. }
    14. this.children = new EventLoop[nThreads];
    15. for (int i = 0; i < nThreads; i ++) {
    16. children[i] = newChild(executor);
    17. }
    18. /**
    19. * 为了迭代用
    20. */
    21. Set<EventLoop> childrenSet = new LinkedHashSet<EventLoop>(children.length);
    22. Collections.addAll(childrenSet, children);
    23. readonlyChildren = Collections.unmodifiableSet(childrenSet);
    24. }
    25. protected abstract EventLoop newChild(Executor executor);
    26. /**
    27. * 源码用一个chooser对象选择子线程,这里简化一下,就轮训吧
    28. * @return
    29. */
    30. int i=0;
    31. public EventLoop chooseNext() {
    32. if (i>=children.length) {
    33. i =0;
    34. }
    35. EventLoop child = children[i];
    36. i++;
    37. return child;
    38. }
    39. @Override
    40. public void register(Channel channel) {
    41. next().register(channel);
    42. }
    43. @Override
    44. public EventLoop next() {
    45. return chooseNext();
    46. }
    47. @Override
    48. public Iterator<EventLoop> iterator() {
    49. return readonlyChildren.iterator();
    50. }
    51. @Override
    52. public void execute(Runnable command) {
    53. next().execute(command);
    54. }
    55. }
    56. 复制代码

    抽象了newChild交给子类去实际创建组成员

    NioEventLoopGroup

    NioEventLoop的组,实现了newChild创建组成员即:NioEventLoop

    1. public class NioEventLoopGroup extends MultithreadEventLoopGroup {
    2. public NioEventLoopGroup(int nThreads) {
    3. super(nThreads, null);
    4. }
    5. public NioEventLoopGroup(int nThreads, Executor executor) {
    6. super(nThreads, executor);
    7. }
    8. @Override
    9. protected EventLoop newChild(Executor executor) {
    10. return new NioEventLoop(this, SelectorProvider.provider(), executor);
    11. }
    12. }
    13. 复制代码

    ThreadPerTaskExecutor 和 DefaultThreadFactory

    这俩的存在主要是给提供真实线程,并统一命名

    1. public class ThreadPerTaskExecutor implements Executor {
    2. private final ThreadFactory threadFactory;
    3. public ThreadPerTaskExecutor(ThreadFactory threadFactory) {
    4. if (threadFactory == null) {
    5. throw new NullPointerException("threadFactory");
    6. }
    7. this.threadFactory = threadFactory;
    8. }
    9. @Override
    10. public void execute(Runnable command) {
    11. threadFactory.newThread(command).start();
    12. }
    13. }
    14. 复制代码
    1. public class DefaultThreadFactory implements ThreadFactory {
    2. private AtomicInteger no = new AtomicInteger(0);
    3. @Override
    4. public Thread newThread(Runnable r) {
    5. return new Thread(r, "nio-thread-"+(no.incrementAndGet()));
    6. }
    7. }
    8. 复制代码

    Bootstrap

    channel和eventLoop都定义完了,接下来就要给他们串联起来,做启动类了,由于有服务端和客户端两种启动类,所以还是抽象了一个Bootstrap

    AbstractBootstrap

    1. public abstract class AbstractBootstrap extends AbstractBootstrap, C>, C extends Channel> {
    2. volatile EventLoopGroup group;
    3. /**
    4. * 源码使用工厂模式存储是channelFactory,这里简化处理
    5. */
    6. private volatile Channel channel;
    7. AbstractBootstrap() {
    8. // Disallow extending from a different package.
    9. }
    10. private B self() {
    11. return (B) this;
    12. }
    13. public B group(EventLoopGroup group) {
    14. this.group = group;
    15. return self();
    16. }
    17. public B channel(Classextends C> channelClass) {
    18. try {
    19. channel = channelClass.getConstructor().newInstance();
    20. } catch (Exception e) {
    21. }
    22. return self();
    23. }
    24. public void bind(int inetPort) {
    25. doBind(new InetSocketAddress(inetPort));
    26. }
    27. private void doBind(final SocketAddress localAddress) {
    28. initAndRegister();
    29. // 让channel绑定的线程去实际绑定
    30. channel.eventLoop().execute(()->{
    31. channel.bind(localAddress);
    32. });
    33. }
    34. final void initAndRegister() {
    35. init(channel);
    36. group.register(channel);
    37. }
    38. abstract void init(Channel channel);
    39. }
    40. 复制代码

    抽象了一个channel init方法,如果前面看懂了,这里应该能猜到主要是为了给ServerChannel添加默认的handler

    ServerBootstrap

    1. public class ServerBootstrap extends AbstractBootstrap {
    2. private volatile EventLoopGroup childGroup;
    3. /**
    4. * 这里简化处理
    5. */
    6. private volatile ChannelHandler[] childHandlers;
    7. public ServerBootstrap() { }
    8. public ServerBootstrap group(EventLoopGroup parentGroup, EventLoopGroup childGroup) {
    9. super.group(parentGroup);
    10. this.childGroup = childGroup;
    11. return this;
    12. }
    13. public ServerBootstrap childHandler(ChannelHandler... childHandlers) {
    14. this.childHandlers = childHandlers;
    15. return this;
    16. }
    17. @Override
    18. void init(Channel channel) {
    19. ChannelPipeline p = channel.pipeline();
    20. // 这里给ServerChannel添加管道处理器,简化了代码
    21. p.addLast(new ServerBootstrapAcceptor(childGroup, childHandlers));
    22. }
    23. private static class ServerBootstrapAcceptor implements ChannelInboundHandler {
    24. private final EventLoopGroup childGroup;
    25. private final ChannelHandler[] childHandlers;
    26. private ServerBootstrapAcceptor(EventLoopGroup childGroup, ChannelHandler[] childHandlers) {
    27. this.childGroup = childGroup;
    28. this.childHandlers = childHandlers;
    29. }
    30. @Override
    31. public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
    32. final Channel child = (Channel) msg;
    33. for (ChannelHandler childHandler : childHandlers) {
    34. child.pipeline().addLast(childHandler);
    35. }
    36. childGroup.register(child);
    37. }
    38. @Override
    39. public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
    40. // 略
    41. }
    42. }
    43. }
    44. 复制代码

    子类ServerBootstrapAcceptor 就是ServerSocketChannel默认的handler,通过init方法添加上

    总结

    到此,山寨版的netty写完了,累了一头汗,代码的命名和类命名尽量和源码保持一致,因为都是复制过来的,有些很复杂的地方做了简化处理,但个人感觉核心的代码除了bytebuf应该都写上了,使用方式开头有写基本和netty差不多,还是那句话,手写的目的是理解netty源码

    篇幅有限,这里面的事好多代码都没细讲,但代码也摘的很轻量,完全可以自行理解,回头再看netty,基本就差不多

    回想一下,其实netty的核心概念就总结出来了:channel维护了jdk的通道,并可以设置后置处理器实现,EventLoop是针对channel事件的专用线程,而EventLoopGroup是它们组合即专用线程池

  • 相关阅读:
    VMware 虚拟机里连不上网的解决方案
    2023年天津农学院专升本专业课参考教材
    针对icon报错
    nginx的性能调优
    网格布局之网格线编号定位
    关于Webpack
    (译)TDD(测试驱动开发)的5个步骤
    csdn测开涨薪技术-Git原理及使用全套教程
    Serverless Devs 进入 CNCF 沙箱,成首个入选的 Serverless 工具项目
    【POI】word读取常见问题(涉及格式:doc、docx、rtf)
  • 原文地址:https://blog.csdn.net/m0_73311735/article/details/127788105