最近一直在看netty源码,观后感:很难看,于是为了屡清netty的设计思路,我参照netty源码手写一个山寨简版的“netty”,说是手写,其实也就是从源码复制出来核心的代码,并尽量保持命名,设计结构与源码基本一致,因为我的目的很明确:尝试以作者的角度理解netty的全貌
最终山寨版的netty代码server端使用如下(代码没有引用任何netty的依赖)
- public static void main(String[] args) {
- EventLoopGroup bossGroup = new NioEventLoopGroup(1);
- EventLoopGroup workerGroup = new NioEventLoopGroup(4);
- try {
- ServerBootstrap bootstrap = new ServerBootstrap();
- bootstrap.group(bossGroup, workerGroup)
- .channel(NioServerSocketChannel.class)
- .childHandler(new NettyServerHandler(), new NettyServerHandler2());
- System.out.println("netty server start...");
- bootstrap.bind(9000);
- } finally {
- }
- }
- 复制代码
其中NettyHandler:
- public class NettyServerHandler implements ChannelInboundHandler {
-
- @Override
- public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
- }
-
- @Override
- public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
- System.out.println("thread: " + Thread.currentThread().getName());
- System.out.println("msg:" + new String(((ByteBuffer) msg).array()));
- // 传递给下个handler
- ctx.fireChannelRead(msg);
- }
- }
- 复制代码
看起来和真的netty其实差不多,最终执行的效果也比较符合预期
首先,在学习netty前,要了解它是干嘛的,从我们常使用的场景上看可以说NETTY其实就是对NIO编程的一种封装,所以在理解netty之前,nio的基础知识是必须要掌握的,下面是一个NIO编程的基础server端代码:
- public class NioServer {
-
- public static void main(String[] args) throws IOException, InterruptedException {
-
- // 创建NIO ServerSocketChannel
- ServerSocketChannel serverSocket = ServerSocketChannel.open();
- serverSocket.socket().bind(new InetSocketAddress(9000));
- serverSocket.configureBlocking(false);
- // 打开Selector处理Channel,底层调用epoll_create
- Selector selector = Selector.open();
- // 把ServerSocketChannel注册到selector上,并且selector对客户端accept连接操作感兴趣,底层调用epoll_ctl
- // SelectionKey registerKey = serverSocket.register(selector, SelectionKey.OP_ACCEPT);
- SelectionKey serverRegisterKey = serverSocket.register(selector, 0);
- serverRegisterKey.interestOps(SelectionKey.OP_ACCEPT);
- // 测试attach
- serverRegisterKey.attach(new NioServer());
- System.out.println("服务启动成功");
-
- while (true) {
- // 阻塞等待需要处理的事件发生,即调用epoll_wait
- selector.select();
- // 获取selector中注册的全部事件的 SelectionKey 实例
- Set<SelectionKey> selectionKeys = selector.selectedKeys();
- Iterator<SelectionKey> iterator = selectionKeys.iterator();
-
- // 遍历SelectionKey对事件进行处理
- while (iterator.hasNext()) {
- SelectionKey key = iterator.next();
- // 测试attachment
- System.out.println("attachment: "+ key.attachment());
- // 测试key
- System.out.println("key is serverRegisterKey:" + key.equals(serverRegisterKey));
- // 如果是OP_ACCEPT事件,则进行连接获取和事件注册
- if (key.isAcceptable()) {
- ServerSocketChannel server = (ServerSocketChannel) key.channel();
- SocketChannel socketChannel = server.accept();
- socketChannel.configureBlocking(false);
- // 这里只注册了读事件,如果需要给客户端发送数据可以注册写事件,底层调用epoll_ctl
- socketChannel.register(selector, SelectionKey.OP_READ);
- System.out.println("客户端连接成功");
- } else if (key.isReadable()) { // 如果是OP_READ事件,则进行读取和打印
- SocketChannel socketChannel = (SocketChannel) key.channel();
- ByteBuffer byteBuffer = ByteBuffer.allocate(128);
- int len = socketChannel.read(byteBuffer);
- // 如果有数据,把数据打印出来
- if (len > 0) {
- System.out.println("接收到消息:" + new String(byteBuffer.array()));
- } else if (len == -1) { // 如果客户端断开连接,关闭Socket
- System.out.println("客户端断开连接");
- socketChannel.close();
- }
- }
- //从事件集合里删除本次处理的key,防止下次select重复处理
- iterator.remove();
- }
- }
- }
- }
- 复制代码
如果这里看不懂,可以参考Netty-理解selector是什么
上面的服务端代码有几个问题:
而netty的存在也就是为了解决这些问题,通过封装让NIO的编程变的简单易用
所以下面我们就假装以作者的设计角度,尝试着写一个"山寨"的netty
首先我们要把每次都会写的固定代码封装起来,比如服务端接受OP_ACCEPT事件,之后需要把accpet到的channel注册到多路复用器,这些都是基本固定代码,可以封装,而接受到客户端信息之后的处理方式是根据需求而定的,所以要暴露出来可以让用户自定义
其次就是最重要的线程模型,肯定不能所有的请求都同一个线程去处理,最次也得用个线程池
Channel的封装
回头看NIO的代码,出现了两种jdk的channel:ServerSocketChannel和SocketChannel,二者都继承了SelectableChannel,二者都有一些通用的行为,我们可以给它抽象出来一个channel类,里面封装了jdk的channel和感兴趣的事件,并提供了注册到多路复用器、绑定端口、设置非阻塞等行为,并可以继续泛化为服务端通道和客户端通道

Channel下的管道
回头看NIO的代码,每个channel发生事件后都会进行一些处理,所以这些处理的方式可以说是属于某个channel的,而一个channel下可能会有个步骤的处理工作,比如先解码再实际处理业务,我们可以把channel下每个处理步骤抽象成一个handler,多个handler互相连接组成channel下的一个管道(pipeline),这样channel的机构如下

用户可以通过给管道下的pipeline添加处理步骤来改变时间放生时的响应,以实现自定义的客户端通道发来信息的响应
而服务端的管道处理步骤基本是一定的,即读到channel注册到多路复用器,所以对应的handler我们可以封装一个在服务端通道建立时就绑定上
EventLoop
通道封装完了,通道发生事件后的处理步骤也可以用户自定义,下一步研究就是这些活到底谁来干,也就是方法写好了,下一步是用什么线程模型去执行这些方法能做到高效,所以现在需要的是一个执行者(Executor)
我们想想实际场景,一个服务端可能会受到多个客户端的连接请求,我们要做的是高效的让请求多时可以用多个线程去处理,同时希望线程数可以根据场景来配置,针对这种情况,大师Doug Lea给出了以一种解决思路,写在《Scalable IO in Java》中,有兴趣可以自行百度,最终Lea给出的模型大概如下

主要思路是有一个主线程可以通过selector监听服务端通道,在发生连接事件后,把连接的通道注册到子线程池中某一个线程下的selector中,这个子线程就负责使用selector监听客户端通道,发生事件后处理
所以我们要有这样一种线程:
这种线程可以处理事件(Event),同时它一经启动就不会自动关闭,因为要监控selector,所以内部一定是个死循环Loop,所以这种执行特定任务的线程就叫做EventLoop,说的更直白点,就是一个会不断响应多路复用器事件的多channel处理者
而channel也可以注册到EventLoop,这样channel发生的事件就会由EventLoop按管道内部定义的handler执行处理

EventLoop&EventLoopGroup
EventLoop是一个特定的线程,那么EventLoopGroup就是这种特定线程的组合,也就是特定线程池,内部包含固定数量的EventLoop,而其对外提供的服务和内部的EventLoop组一模一样,只不过选了某一个EventLoop去具体执行服务,所以二者的方法是一样的,在netty源码中,EventLoop继承了EventLoopGroup,看到这里不免很糊涂,确实很怪,但可以这样去理解:EventLoopGroup是一群一模一样的EventLoop的组合,所以EventLoop能干什么,EventLoopGroup也就能且只能干什么,这样EventLoop就可以看成一个特殊的EventLoopGroup,只不过是只有一个对象的EventLoopGroup
比如:有一个瓦匠群体,瓦匠群体的每个人都只会砌墙,那么群体对外能提供的服务即砌墙,只不过是选一个瓦匠去砌墙,而某个瓦匠对外提供的服务也是砌墙,那么他可以看做一个特殊的瓦匠群体

uml
结合以上的设计思路,再尽量贴近netty的源码,最终画出的uml图如下

与上面的思路相比,更细致了多层的抽象类,尽量单一职责,主要是为了和netty基本结构一致
相比于netty,主要省略了EventExecutor和EventExecutorGroup,把对应的代码写在了EventLoop和EventLoopGroup中,省略了各种Unsafe内部类,把Unsafe的方法直接写在外部类
每个类的具体意义和实现代码中介绍
Channel
首先是抽象的channel接口,主要方法即绑定EventLoop,绑定端口和获取pipeline
- public interface Channel {
- /**
- * 绑定事件持续处理器
- * @param eventLoop
- */
- void register(EventLoop eventLoop);
-
- /**
- * 获取事件持续处理器
- * @return
- */
- EventLoop eventLoop();
-
- /**
- * 通道内部的管道
- * @return
- */
- ChannelPipeline pipeline();
-
- /**
- * 绑定端口
- * @param localAddress
- */
- void bind(SocketAddress localAddress);
-
- /**
- * 开始读取,nio的实现即注册感兴趣的事件
- */
- void beginRead();
- }
- 复制代码
ChannelHandler
即管道绑定的处理器,一个管道对应一个pipeline,一个pipeline用链表形式存储多个处理器,这个我简化了一下,只是空接口,他有两种子类ChannelInboundHandler和ChannelOutboundHandler,分别管道进入即读事件的处理器和管道流出即写事件的处理器
- public interface ChannelHandler {
- }
- 复制代码
ChannelInboundHandler
管道进入事件处理器,与之对应管道返回事件处理器,我并没有写
- public interface ChannelInboundHandler extends ChannelHandler{
-
- void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception;
-
- void channelReadComplete(ChannelHandlerContext ctx) throws Exception;
- }
- 复制代码
ChannelHandlerContext
由于用户再写处理器时需要得知上下文信息,即channel是哪个(可以通过channel写回数据),eventLoop是哪个,所以需要用一个上下文对象把handler包装起来,同时Context对象还保存了链表的关系,使得handler形成链表
- public class ChannelHandlerContext {
-
- private final ChannelHandler handler;
-
- /**
- * 链表
- */
- volatile ChannelHandlerContext next;
- volatile ChannelHandlerContext prev;
-
- private final ChannelPipeline pipeline;
-
- public ChannelHandlerContext(ChannelPipeline pipeline, ChannelHandler handler) {
- this.pipeline = pipeline;
- this.handler = handler;
- }
-
- /**
- * 当前通道
- * @return
- */
- public Channel channel() {
- return pipeline.channel();
- }
-
- /**
- * 当前管道
- * @return
- */
- public ChannelPipeline pipeline() {
- return pipeline;
- }
-
- /**
- * 当前执行器
- * @return
- */
- public EventLoop executor() {
- return channel().eventLoop();
- }
-
- public ChannelHandler handler() {
- return handler;
- }
-
- /**
- * 把信息传给链表下一个read节点去处理
- * @param msg
- * @return
- */
- public ChannelHandlerContext fireChannelRead(final Object msg) {
- findContextInbound().invokeChannelRead(msg);
- return this;
- }
-
- /**
- * 找到自己后面的Inbound处理器
- * @return
- */
- private ChannelHandlerContext findContextInbound() {
- ChannelHandlerContext ctx = this;
- do {
- ctx = ctx.next;
- } while (!(ctx.handler() instanceof ChannelInboundHandler));
- return ctx;
- }
-
- /**
- * 调用handler的channelRead方法
- * @param msg
- */
- private void invokeChannelRead(Object msg) {
- try {
- ((ChannelInboundHandler) handler()).channelRead(this, msg);
- } catch (Throwable t) {
- // 如果当前的handler不是ChannelInboundHandler则报错
- }
- }
-
- }
- 复制代码
ChannelPipeline
保存链表的收尾,因为链表是双向链表(in和out反向),同时支持添加新处理器至链表
- public class ChannelPipeline {
- /**
- * 管道的第一个处理器,管道的处理器是链式结构
- */
- final ChannelHandlerContext head;
- final ChannelHandlerContext tail;
- /**
- * 所在的通道
- */
- private final Channel channel;
-
- public ChannelPipeline(Channel channel) {
- this.channel = channel;
- head = new HeadContext(this);
- tail = new TailContext(this);
- // 头尾互指
- head.next = tail;
- tail.prev = head;
- }
-
- public final Channel channel() {
- return channel;
- }
-
- /**
- * 添加处理器
- * @param handler
- * @return
- */
- public final ChannelPipeline addLast(ChannelHandler handler) {
- // 把handler包装成上下文
- ChannelHandlerContext newCtx = new ChannelHandlerContext(this, handler);
- addLast0(newCtx);
- return this;
- }
-
- /**
- * 在链表结尾添加新的节点
- * @param newCtx
- */
- private void addLast0(ChannelHandlerContext newCtx) {
- ChannelHandlerContext prev = tail.prev;
- newCtx.prev = prev;
- newCtx.next = tail;
- prev.next = newCtx;
- tail.prev = newCtx;
- }
-
- /**
- * 结尾,简化处理
- */
- final class TailContext extends ChannelHandlerContext {
- public TailContext(ChannelPipeline pipeline) {
- super(pipeline, null);
- }
- }
-
- /**
- * 头部简化处理
- */
- final class HeadContext extends ChannelHandlerContext {
- public HeadContext(ChannelPipeline pipeline) {
- super(pipeline, null);
- }
- }
-
- /**
- * 开始处理read操作
- * @param msg
- * @return
- */
- public final ChannelPipeline fireChannelRead(Object msg) {
- head.fireChannelRead(msg);
- return this;
- }
-
- public final ChannelPipeline fireChannelReadComplete() {
- // 省略不写了,和fireChannelRead差不多道理
- return this;
- }
- }
- 复制代码
AbstractChannel
AbstractChannel 抽象的channel实现,主要实现了绑定EventLoop对象和初始化ChannelPipeline,也就是构造出Channel数据结构
- public abstract class AbstractChannel implements Channel {
-
- /**
- * 父通道
- */
- private final Channel parent;
-
- /**
- * 绑定的事件循环器
- */
- private volatile EventLoop eventLoop;
-
- /**
- * 管道
- */
- private final ChannelPipeline pipeline;
-
- public AbstractChannel(Channel parent) {
- this.parent = parent;
- pipeline = newChannelPipeline();
- }
-
- protected ChannelPipeline newChannelPipeline() {
- return new ChannelPipeline(this);
- }
-
- /**
- * 返回绑定的事件处理器
- *
- * @return
- */
- @Override
- public EventLoop eventLoop() {
- return eventLoop;
- }
-
- @Override
- public ChannelPipeline pipeline() {
- return pipeline;
- }
-
- @Override
- public void beginRead() {
- doBeginRead();
- }
-
- protected abstract void doBeginRead();
-
- @Override
- public void bind(SocketAddress localAddress) {
- try {
- doBind(localAddress);
- } catch (Exception e) {
- e.printStackTrace();
- }
- }
-
- protected abstract void doBind(SocketAddress localAddress) throws Exception;
-
- protected void doRegister() throws Exception {
- // NOOP
- }
-
- /**
- * Channel绑定eventLoop
- *
- * @param eventLoop
- */
- @Override
- public final void register(EventLoop eventLoop) {
- // 省去乱七八遭的判断,源码实在内部类Unsafe下,所以是:AbstractChannel.this
- AbstractChannel.this.eventLoop = eventLoop;
- eventLoop.execute(() -> {
- register0();
- });
- }
-
- /**
- * 实际注册
- */
- private void register0() {
- try {
- doRegister();
- // 开始读取感兴趣事件
- beginRead();
- } catch (Exception e) {
- e.printStackTrace();
- }
- }
- }
- 复制代码
通过new ChannelPipeline(this),初始化了一个channel下的空管道,register方法只是存储EventLoop对象,而实际的doRegister抽象处理,因为不同的channel注册方式不一样,NIO是注册到多路复用器,其它的注册方式并非如此
AbstractNioChannel
这个就是专门处理NIO的抽象channel了,就可以实际去实现NIO的注册了
- public abstract class AbstractNioChannel extends AbstractChannel {
-
- /**
- * java的channel 包括ServerSocketChannel和SocketChannel
- */
- private final SelectableChannel ch;
-
- private SelectionKey selectionKey;
-
- /**
- * 感兴趣的事件
- */
- protected final int readInterestOp;
-
- public AbstractNioChannel(Channel parent, SelectableChannel ch, int readInterestOp) {
- super(parent);
- this.ch = ch;
- this.readInterestOp = readInterestOp;
- try {
- ch.configureBlocking(false);
- } catch (IOException e) {
- e.printStackTrace();
- }
- }
-
- /**
- * 返回java的channel
- *
- * @return
- */
- protected SelectableChannel javaChannel() {
- return ch;
- }
-
- /**
- * 把通道注册到多路复用器
- * @throws Exception
- */
- @Override
- protected void doRegister() throws Exception {
- // 最后一个字段this,相当于selectionKey.attach(this),后续可以通过attachment()方法取到
- // 由于多个channel注册到一个eventLoop,所以需要传递当前的channel以便eventLoop获取到事件时可以知道是哪个channel产生的事件
- selectionKey = javaChannel().register(((NioEventLoop) eventLoop()).unwrappedSelector(), 0, this);
- }
-
- /**
- * 注册感兴趣事件
- */
- @Override
- protected void doBeginRead() {
- selectionKey.interestOps(readInterestOp);
- }
-
- /**
- * 从 {@link SelectableChannel} 读取事件,源码是写在Unsafe里
- */
- public abstract void read();
-
- }
- 复制代码
存储了SelectableChannel,即ServerSocketChannel和SocketChannel的共同父类
实现了doRegister方法,即把jdk的channel注册到EventLoop下面的多路复用器
实现了doBeginRead方法,即注册感兴趣事件
抽象了一个read方法,即从channel读取信息,由于客户端与服务端读取方法不一样,所以抽象出来
而客户端与服务端的read实现也是分别抽象了两个类来提交给channel的handler,即AbstractNioMessageChannel和AbstractNioByteChannel
AbstractNioMessageChannel
主要封装了通过抽象doReadMessages读取事件信息后传递给channel的管道
- public abstract class AbstractNioMessageChannel extends AbstractNioChannel {
- /**
- * 读取到的缓存
- */
- private final List<Object> readBuf = new ArrayList<Object>();
-
- public AbstractNioMessageChannel(Channel parent, SelectableChannel ch, int readInterestOp) {
- super(parent, ch, readInterestOp);
- }
-
- /**
- * 从SelectableChannel中读取信息
- */
- @Override
- public void read() {
- final ChannelPipeline pipeline = pipeline();
- // 实际读取信息,由子类实现
- doReadMessages(readBuf);
- int size = readBuf.size();
- for (int i = 0; i < size; i ++) {
- // 调用管道的read处理器
- pipeline.fireChannelRead(readBuf.get(i));
- }
- readBuf.clear();
- }
-
- protected abstract int doReadMessages(List<Object> buf);
- }
- 复制代码
其中doReadMessages实际读取信息,看一下子类NioServerSocketChannel如何实现
NioServerSocketChannel
NIO服务端通道,实现了doReadMessages,即读取客户端channel
- public class NioServerSocketChannel extends AbstractNioMessageChannel {
-
- /**
- * 多路复用器提供者
- */
- private static final SelectorProvider DEFAULT_SELECTOR_PROVIDER = SelectorProvider.provider();
-
- /**
- * 开启一个 java ServerSocketChannel
- * @param provider
- * @return
- */
- private static ServerSocketChannel newSocket(SelectorProvider provider) {
- try {
- return provider.openServerSocketChannel();
- } catch (IOException e) {
- return null;
- }
- }
-
- public NioServerSocketChannel() {
- this(newSocket(DEFAULT_SELECTOR_PROVIDER));
- }
-
- public NioServerSocketChannel(ServerSocketChannel channel) {
- super(null, channel, SelectionKey.OP_ACCEPT);
- }
-
- /**
- * 覆盖,因为可以确定返回的是ServerSocketChannel
- * @return
- */
- @Override
- protected ServerSocketChannel javaChannel() {
- return (ServerSocketChannel) super.javaChannel();
- }
-
- /**
- * 绑定端socket
- * @param localAddress
- * @throws Exception
- */
- @Override
- protected void doBind(SocketAddress localAddress) throws Exception {
- javaChannel().socket().bind(localAddress);
- }
-
- /**
- * 读取信息,作为SeverSocketChannel(服务端通道),读取信息即accept后的SocketChannel(客户端通道)
- * @param buf
- * @return
- */
- @Override
- protected int doReadMessages(List<Object> buf) {
- SocketChannel ch = null;
- try {
- ch = javaChannel().accept();
- } catch (IOException e) {
- }
- if (ch != null) {
- buf.add(new NioSocketChannel(this, ch));
- return 1;
- }
- return 0;
- }
-
- }
- 复制代码
同时封装了ServerSocketChannel的创建,并和感兴趣SelectionKey.OP_ACCEPT的事件传递给父类,实现了绑定端口javaChannel().socket().bind(localAddress)
AbstractNioByteChannel
与AbstractNioMessageChannel对应,是读取byte也就是字节并传递给channel的管道,实际的读取还是抽象的doReadBytes
- public abstract class AbstractNioByteChannel extends AbstractNioChannel {
-
- public AbstractNioByteChannel(Channel parent, SelectableChannel ch, int readInterestOp) {
- super(parent, ch, readInterestOp);
- }
-
- @Override
- public void read() {
- final ChannelPipeline pipeline = pipeline();
- // 这里做了简化处理,源码用的自封装ByteBuf
- ByteBuffer byteBuf = ByteBuffer.allocate(128);
- doReadBytes(byteBuf);
- pipeline.fireChannelRead(byteBuf);
- }
-
- protected abstract int doReadBytes(ByteBuffer buf);
- }
- 复制代码
源码使用自己封装的ByteBuf,这里简化了,看一下它的子类即NioSocketChannel
NioSocketChannel
客户端的channel
- public class NioSocketChannel extends AbstractNioByteChannel {
-
- /**
- * 多路复用器提供者
- */
- private static final SelectorProvider DEFAULT_SELECTOR_PROVIDER = SelectorProvider.provider();
-
- /**
- * 开启一个 java SocketChannel 这个方法为客户端创建通道使用
- *
- * @param provider
- * @return
- */
- private static SocketChannel newSocket(SelectorProvider provider) {
- try {
- return provider.openSocketChannel();
- } catch (IOException e) {
- return null;
- }
- }
-
- public NioSocketChannel() {
- this(null, newSocket(DEFAULT_SELECTOR_PROVIDER));
- }
-
- /**
- * SocketChannel 感兴趣的事件是READ
- *
- * @param parent
- * @param channel
- */
- public NioSocketChannel(Channel parent, SocketChannel channel) {
- super(parent, channel, SelectionKey.OP_READ);
- }
-
- /**
- * 覆盖,因为可以确定返回的是SocketChannel
- *
- * @return
- */
- @Override
- protected SocketChannel javaChannel() {
- return (SocketChannel) super.javaChannel();
- }
-
- /**
- * 绑定端socket
- *
- * @param localAddress
- * @throws Exception
- */
- @Override
- protected void doBind(SocketAddress localAddress) throws Exception {
- javaChannel().socket().bind(localAddress);
- }
-
- @Override
- protected int doReadBytes(ByteBuffer buf) {
- try {
- return javaChannel().read(buf);
- } catch (IOException e) {
- throw new RuntimeException();
- }
- }
- }
- 复制代码
这个和Server端差不多,只不过一个读channel,一个读字节。感兴趣的事件是SelectionKey.OP_READ
到此channel相关类写完~
EventLoopGroup
事件循环器组,和EventLoop提供一样的功能,同时可以选择下一个EventLoop且可以迭代
- public interface EventLoopGroup extends Executor, Iterable<EventLoop>{
-
- void register(Channel channel);
-
- EventLoop next();
-
- @Override
- Iterator
iterator() ; - }
- 复制代码
EventLoop
继承EventLoopGroup,并且可以查到父Group
- public interface EventLoop extends EventLoopGroup {
- EventLoopGroup parent();
- }
- 复制代码
SingleThreadEventExecutor
本来打算把EventExecutor去掉,代码写在EventLoop,但这个类太重要了,所以保留了下来
- public abstract class SingleThreadEventExecutor implements Executor {
- /**
- * 默认任务列表长度
- */
- protected static final int DEFAULT_MAX_PENDING_TASKS = 16;
- /**
- * 待完成的任务
- */
- private final Queue<Runnable> taskQueue;
- /**
- * 实际工作者
- */
- private final Executor executor;
- /**
- * 当前运行线程
- */
- private volatile Thread thread;
-
- /**
- * ST_NOT_STARTED: 未启动, ST_STARTED 已启动
- */
- private static final int ST_NOT_STARTED = 1;
- private static final int ST_STARTED = 2;
-
- /**
- * 标记是否启动
- */
- private volatile int state = ST_NOT_STARTED;
-
- /**
- * 原子启动标记更新器
- */
- private static final AtomicIntegerFieldUpdater<SingleThreadEventExecutor> STATE_UPDATER =
- AtomicIntegerFieldUpdater.newUpdater(SingleThreadEventExecutor.class, "state");
-
- public SingleThreadEventExecutor(Executor executor) {
- taskQueue = newTaskQueue(DEFAULT_MAX_PENDING_TASKS);
- this.executor = executor;
- }
-
- /**
- * 初始化一个新的任务对列
- * @param maxPendingTasks
- * @return
- */
- protected Queue<Runnable> newTaskQueue(int maxPendingTasks) {
- return new LinkedBlockingQueue<>(maxPendingTasks);
- }
-
- /**
- * 添加任务
- * @param task
- */
- protected void addTask(Runnable task) {
- if (task == null) {
- throw new NullPointerException("task");
- }
- if (!taskQueue.offer(task)) {
- throw new RejectedExecutionException("event executor terminated");
- }
- }
-
- /**
- * 检查是否有任务
- *
- * @return
- */
- protected boolean hasTasks() {
- return !taskQueue.isEmpty();
- }
-
- /**
- * 运行所有任务
- * @return
- */
- protected boolean runAllTasks() {
- // 省略乱七八糟的判断,把多个子方法简化
- Runnable task = taskQueue.poll();
- if (task == null) {
- return false;
- }
- for (;;) {
- task.run();
- task = taskQueue.poll();
- if (task == null) {
- return true;
- }
- }
- }
-
- @Override
- public void execute(Runnable task) {
- addTask(task);
- startThread();
- }
-
- /**
- * 开启线程执行(判断已启动过不再启动)
- */
- private void startThread() {
- // 未启动才能启动,也就是只启动一次
- if (state == ST_NOT_STARTED) {
- // 再次CAS判断避免线程不安全
- if (STATE_UPDATER.compareAndSet(this, ST_NOT_STARTED, ST_STARTED)) {
- doStartThread();
- }
- }
- }
-
- /**
- * 实际开启线程执行run方法
- */
- private void doStartThread() {
- // 使用真实的执行者执行任务
- executor.execute(() -> {
- // 保存执行的 线程
- thread = Thread.currentThread();
- // 省去乱起八遭的判断
- SingleThreadEventExecutor.this.run();
- // 如果执行结束,则报错
- System.out.println("Buggy EventExecutor implementation; SingleThreadEventExecutor.confirmShutdown() must be called before run() implementation terminates");
- });
- }
-
- /**
- * 抽象run方法,是一个不能运行结束的方法(除非手动关闭),即loop
- */
- protected abstract void run();
-
- }
- 复制代码
可以这么概括它:SingleThreadEventExecutor是一种特殊的任务执行器,第一次收到任务它就会启动(开启一个线程运行run方法,run方法是一个会一直执行的方法,否则报错),当任务来时并不是立即执行,它们会加入到自己的任务对列中,并且按自己的套路(run)去执行
SingleThreadEventLoop
单线程事件循环器
- public abstract class SingleThreadEventLoop extends SingleThreadEventExecutor implements EventLoop {
-
- private final Collection<EventLoop> selfCollection = Collections.<EventLoop>singleton(this);
-
- private final EventLoopGroup parent;
-
- public SingleThreadEventLoop(EventLoopGroup parent, Executor executor) {
- super(executor);
- this.parent = parent;
- }
-
- @Override
- public EventLoopGroup parent() {
- return parent;
- }
-
-
- @Override
- public void register(Channel channel) {
- // netty源码 promise.channel().unsafe().register(this, promise); 简化不区分unsafe,如下
- channel.register(this);
- }
-
- @Override
- public EventLoop next() {
- return this;
- }
-
- @Override
- public Iterator<EventLoop> iterator() {
- return selfCollection.iterator();
- }
-
- }
- 复制代码
这个类其实就是实现了parent(),和next() ,iterator()这种应对Group接口的方法,之所以敢称单线程,是因为他爹是单线程处理器SingleThreadEventExecutor,它的存在就是为了让子类不用再处理与Group的关系
NioEventLoop
重点来了,Nio版的事件事件循环器
- public class NioEventLoop extends SingleThreadEventLoop {
-
- private Selector selector;
-
- private final SelectorProvider provider;
-
- public NioEventLoop(NioEventLoopGroup parent, SelectorProvider selectorProvider, Executor executor) {
- super(parent, executor);
- this.provider = selectorProvider;
- this.selector = openSelector();
- }
-
- public SelectorProvider selectorProvider() {
- return provider;
- }
-
- private Selector openSelector() {
- try {
- return selectorProvider().openSelector();
- } catch (IOException e) {
- e.printStackTrace();
- return null;
- }
- }
-
- /**
- * 由于没做装饰,所以selector即unwrappedSelector
- *
- * @return
- */
- public Selector unwrappedSelector() {
- return selector;
- }
-
- /**
- * 运行
- */
- @Override
- protected void run() {
- for (;;) {
- try {
- select();
- } catch (IOException e) {
- e.printStackTrace();
- }
- try {
- processSelectedKeys();
- } finally {
- runAllTasks();
- }
- }
- }
-
- private void select() throws IOException {
- // 拿到多路复用器
- Selector selector = this.selector;
- for (;;) {
- // 等待,简化固定1秒
- int selectedKeys = selector.select(1000);
- // 如果有事件发生或当前有任务跳出循环
- if (selectedKeys != 0 || hasTasks()) {
- break;
- }
- }
- }
-
- private void processSelectedKeys() {
- processSelectedKeysPlain(selector.selectedKeys());
- }
-
- private void processSelectedKeysPlain(Set<SelectionKey> selectedKeys) {
- if (selectedKeys.isEmpty()) {
- return;
- }
- Iterator<SelectionKey> i = selectedKeys.iterator();
- for (;;) {
- final SelectionKey k = i.next();
- final Object a = k.attachment();
- i.remove();
- // 获取注册时绑定的参数
- if (a instanceof AbstractNioChannel) {
- processSelectedKey(k, (AbstractNioChannel) a);
- } else {
- // 由于手写简版只attach了AbstractNioChannel所以不会出现,但源码有其它的attach
- }
- if (!i.hasNext()) {
- break;
- }
- }
- }
-
- /**
- * 处理单个事件
- * @param k
- * @param ch
- */
- private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {
- // 这个try是为了和源码尽量长得像,简版不处理异常
- try {
- // 获取发生的事件标识
- int readyOps = k.readyOps();
- // 如果是read事件 或 accpet事件
- if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
- // channel读取
- ch.read();
- }
- } catch (Exception e) {
-
- }
- }
- }
- 复制代码
内部携带了一个多路复用器,作为一个SingleThreadEventExecutor,它的运行套路是不断的监听selector,如果任务队列有任务,就处理任务,这里我简化了代码,监听1秒再去查看是否有任务,没有再回来监听,源码是有个策略判断该执行任务还是该阻塞到selector上
MultithreadEventLoopGroup
它存在的意义和SingleThreadEventLoop差不多,帮助子类处理group和成员的关系,实现了next方法(这里我简化了用轮训,源码可以自定义chooser)
- public abstract class MultithreadEventLoopGroup implements EventLoopGroup {
-
- private final EventLoop[] children;
- /**
- * 为了迭代用
- */
- private final Set<EventLoop> readonlyChildren;
-
- public MultithreadEventLoopGroup(int nThreads, Executor executor) {
- if (nThreads <= 0) {
- throw new IllegalArgumentException();
- }
- if (executor == null) {
- executor = new ThreadPerTaskExecutor(new DefaultThreadFactory());
- }
- this.children = new EventLoop[nThreads];
- for (int i = 0; i < nThreads; i ++) {
- children[i] = newChild(executor);
- }
-
- /**
- * 为了迭代用
- */
- Set<EventLoop> childrenSet = new LinkedHashSet<EventLoop>(children.length);
- Collections.addAll(childrenSet, children);
- readonlyChildren = Collections.unmodifiableSet(childrenSet);
- }
-
- protected abstract EventLoop newChild(Executor executor);
-
- /**
- * 源码用一个chooser对象选择子线程,这里简化一下,就轮训吧
- * @return
- */
- int i=0;
-
- public EventLoop chooseNext() {
- if (i>=children.length) {
- i =0;
- }
- EventLoop child = children[i];
- i++;
- return child;
- }
-
- @Override
- public void register(Channel channel) {
- next().register(channel);
- }
-
- @Override
- public EventLoop next() {
- return chooseNext();
- }
-
- @Override
- public Iterator<EventLoop> iterator() {
- return readonlyChildren.iterator();
- }
-
- @Override
- public void execute(Runnable command) {
- next().execute(command);
- }
- }
- 复制代码
抽象了newChild交给子类去实际创建组成员
NioEventLoopGroup
NioEventLoop的组,实现了newChild创建组成员即:NioEventLoop
- public class NioEventLoopGroup extends MultithreadEventLoopGroup {
-
- public NioEventLoopGroup(int nThreads) {
- super(nThreads, null);
- }
-
- public NioEventLoopGroup(int nThreads, Executor executor) {
- super(nThreads, executor);
- }
-
- @Override
- protected EventLoop newChild(Executor executor) {
- return new NioEventLoop(this, SelectorProvider.provider(), executor);
- }
- }
- 复制代码
ThreadPerTaskExecutor 和 DefaultThreadFactory
这俩的存在主要是给提供真实线程,并统一命名
- public class ThreadPerTaskExecutor implements Executor {
- private final ThreadFactory threadFactory;
-
- public ThreadPerTaskExecutor(ThreadFactory threadFactory) {
- if (threadFactory == null) {
- throw new NullPointerException("threadFactory");
- }
- this.threadFactory = threadFactory;
- }
-
- @Override
- public void execute(Runnable command) {
- threadFactory.newThread(command).start();
- }
- }
- 复制代码
- public class DefaultThreadFactory implements ThreadFactory {
- private AtomicInteger no = new AtomicInteger(0);
- @Override
- public Thread newThread(Runnable r) {
- return new Thread(r, "nio-thread-"+(no.incrementAndGet()));
- }
- }
- 复制代码
channel和eventLoop都定义完了,接下来就要给他们串联起来,做启动类了,由于有服务端和客户端两种启动类,所以还是抽象了一个Bootstrap
AbstractBootstrap
- public abstract class AbstractBootstrap extends AbstractBootstrap, C>, C extends Channel> {
-
- volatile EventLoopGroup group;
-
- /**
- * 源码使用工厂模式存储是channelFactory,这里简化处理
- */
- private volatile Channel channel;
-
- AbstractBootstrap() {
- // Disallow extending from a different package.
- }
-
- private B self() {
- return (B) this;
- }
-
- public B group(EventLoopGroup group) {
- this.group = group;
- return self();
- }
-
- public B channel(Class extends C> channelClass) {
- try {
- channel = channelClass.getConstructor().newInstance();
- } catch (Exception e) {
- }
- return self();
- }
-
- public void bind(int inetPort) {
- doBind(new InetSocketAddress(inetPort));
- }
-
- private void doBind(final SocketAddress localAddress) {
- initAndRegister();
- // 让channel绑定的线程去实际绑定
- channel.eventLoop().execute(()->{
- channel.bind(localAddress);
- });
- }
-
- final void initAndRegister() {
- init(channel);
- group.register(channel);
- }
-
- abstract void init(Channel channel);
- }
- 复制代码
抽象了一个channel init方法,如果前面看懂了,这里应该能猜到主要是为了给ServerChannel添加默认的handler
ServerBootstrap
- public class ServerBootstrap extends AbstractBootstrap
{ -
- private volatile EventLoopGroup childGroup;
-
- /**
- * 这里简化处理
- */
- private volatile ChannelHandler[] childHandlers;
-
- public ServerBootstrap() { }
-
- public ServerBootstrap group(EventLoopGroup parentGroup, EventLoopGroup childGroup) {
- super.group(parentGroup);
- this.childGroup = childGroup;
- return this;
- }
-
- public ServerBootstrap childHandler(ChannelHandler... childHandlers) {
- this.childHandlers = childHandlers;
- return this;
- }
-
- @Override
- void init(Channel channel) {
- ChannelPipeline p = channel.pipeline();
- // 这里给ServerChannel添加管道处理器,简化了代码
- p.addLast(new ServerBootstrapAcceptor(childGroup, childHandlers));
- }
-
- private static class ServerBootstrapAcceptor implements ChannelInboundHandler {
-
- private final EventLoopGroup childGroup;
- private final ChannelHandler[] childHandlers;
-
- private ServerBootstrapAcceptor(EventLoopGroup childGroup, ChannelHandler[] childHandlers) {
- this.childGroup = childGroup;
- this.childHandlers = childHandlers;
- }
-
- @Override
- public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
- final Channel child = (Channel) msg;
- for (ChannelHandler childHandler : childHandlers) {
- child.pipeline().addLast(childHandler);
- }
- childGroup.register(child);
- }
-
- @Override
- public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
- // 略
- }
- }
- }
- 复制代码
子类ServerBootstrapAcceptor 就是ServerSocketChannel默认的handler,通过init方法添加上
到此,山寨版的netty写完了,累了一头汗,代码的命名和类命名尽量和源码保持一致,因为都是复制过来的,有些很复杂的地方做了简化处理,但个人感觉核心的代码除了bytebuf应该都写上了,使用方式开头有写基本和netty差不多,还是那句话,手写的目的是理解netty源码
篇幅有限,这里面的事好多代码都没细讲,但代码也摘的很轻量,完全可以自行理解,回头再看netty,基本就差不多
回想一下,其实netty的核心概念就总结出来了:channel维护了jdk的通道,并可以设置后置处理器实现,EventLoop是针对channel事件的专用线程,而EventLoopGroup是它们组合即专用线程池