• Netty源码剖析之Netty启动流程


    准备

    1、NettyServer

    public class NettyServer {
    
        public static void main(String[] args) throws InterruptedException {
    
            // 1、创建bossGroup线程组:处理网络连接事件。默认线程数:2*处理器线程数
            NioEventLoopGroup bossGroup = new NioEventLoopGroup(1);
            // 2、创建workGroup线程组:处理网络read/write事件。 默认线程数:2*处理器线程数
            NioEventLoopGroup workerGroup = new NioEventLoopGroup();
            // 3、创建服务端启动助手
            ServerBootstrap serverBootstrap = new ServerBootstrap();
            // 4、服务端启动助手,设置线程组
            serverBootstrap.group(bossGroup,workerGroup)
                    // 5、设置服务端Channel实现类
                    .channel(NioServerSocketChannel.class)
                    // 6、设置bossGroup线程队列中等待连接个数
                    .option(ChannelOption.SO_BACKLOG,128)
                    // 7、设置workerGroup中线程活跃状态
                    .childOption(ChannelOption.SO_KEEPALIVE,true)
                    // 使用channelInitializer 可以配置多个handler
                    .childHandler(new ChannelInitializer<SocketChannel>() {// 8、设置一个通道初始化对象
                        @Override
                        protected void initChannel(SocketChannel ch) throws Exception {
                            // 9、向pipeline中添加自定义的channelHandler, 处理socketChannel传送的数据
                            ch.pipeline().addLast(new NettyServerHandler());
                        }
                    });
    
            // 10、服务端启动并绑定端口
            ChannelFuture future = serverBootstrap.bind(9999).sync();
            // 给服务器启动绑定结果,对结果进行监听,触发回调
            future.addListener((ChannelFuture channelFuture)-> {
                if(channelFuture.isSuccess()){
                    System.out.println("服务器启动成功");
                }else {
                    System.out.println("服务器启动失败");
                }
            });
    
    
            // 11、关闭监听通道和连接池,将异步改同步
            future.channel().closeFuture().sync();
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45

    2、NettyServerHandler

    /**
     * 自定义的channelHandler处理器
     *
     * 事件触发,触发相应函数
     */
    public class NettyServerHandler implements ChannelInboundHandler {
    
        /**
         * 通道读取事件
         * @param ctx
         * @param msg
         * @throws Exception
         */
        @Override
        public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
            ByteBuf byteBuffer = (ByteBuf)msg;
            System.out.println("客户端:"+byteBuffer.toString(CharsetUtil.UTF_8));
        }
    
        /**
         * 通道数据读取完毕事件
         * @param ctx
         * @throws Exception
         */
        @Override
        public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
            TimeUnit.SECONDS.sleep(2);
            ctx.writeAndFlush(Unpooled.copiedBuffer("叫我靓仔!!!".getBytes()));
        }
    
        /**
         * 发生异常捕获事件
         * @param ctx
         * @param cause
         * @throws Exception
         */
        @Override
        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
            cause.printStackTrace();
            ctx.close();
        }
    
        /**
         * 通道就绪事件
         * @param ctx
         * @throws Exception
         */
        @Override
        public void channelActive(ChannelHandlerContext ctx) throws Exception {
        }
    
    
        @Override
        public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
    
        }
    
        @Override
        public void channelUnregistered(ChannelHandlerContext ctx) throws Exception {
    
        }
    
        @Override
        public void channelInactive(ChannelHandlerContext ctx) throws Exception {
    
        }
    
        @Override
        public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
    
        }
    
        @Override
        public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception {
    
        }
    
        @Override
        public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
    
        }
    
        @Override
        public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
    
        }
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88

    3、NettyClient

    /**
     * nettyClient
     */
    public class NettyClient {
    
        public static void main(String[] args) throws InterruptedException {
            // 1、创建线程组
            NioEventLoopGroup group = new NioEventLoopGroup();
            // 2、创建客户端启动助手bootstrap
            Bootstrap bootstrap = new Bootstrap();
            // 3、配置线程组
            bootstrap.group(group)
                    // 4、定义socketChannel的实现类
                    .channel(NioSocketChannel.class)
                    // 5、定义channelHandler, 处理socketChannel的数据
                    .handler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel ch) throws Exception {
                            //6、向pipeline中添加自定义业务处理handler
                            ch.pipeline().addLast(new NettyClientHandler());
                        }
                    });
    
            // 7、启动客户端, 等待连接服务端, 同时将异步改为同步
            ChannelFuture future = bootstrap.connect(new InetSocketAddress(9999)).sync();
            // 8、关闭通道和关闭连接池
            future.channel().closeFuture().sync();
            group.shutdownGracefully();
    
    
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32

    4、NettyClientHandler

    /**
     * 自定义的channelHandler处理器
     * 

    * 事件触发,触发相应函数 */ public class NettyClientHandler implements ChannelInboundHandler { /** * 通道读取事件 * * @param ctx * @param msg * @throws Exception */ @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { ByteBuf byteBuf = (ByteBuf) msg; System.out.println("服务端:" + byteBuf.toString(CharsetUtil.UTF_8)); } /** * 通道数据读取完毕事件 * * @param ctx * @throws Exception */ @Override public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { ctx.writeAndFlush(Unpooled.copiedBuffer("不行,不行啊!!!".getBytes())); } /** * 发生异常捕获事件 * * @param ctx * @param cause * @throws Exception */ @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { cause.printStackTrace(); ctx.close(); } /** * 通道就绪事件 * * @param ctx * @throws Exception */ @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { ctx.writeAndFlush(Unpooled.copiedBuffer("你好哇 小客客!!!".getBytes())); } @Override public void channelRegistered(ChannelHandlerContext ctx) throws Exception { } @Override public void channelUnregistered(ChannelHandlerContext ctx) throws Exception { } @Override public void channelInactive(ChannelHandlerContext ctx) throws Exception { } @Override public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { } @Override public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception { } @Override public void handlerAdded(ChannelHandlerContext ctx) throws Exception { } @Override public void handlerRemoved(ChannelHandlerContext ctx) throws Exception { } }

    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89
    • 90
    • 91
    • 92

    Netty启动流程

    1、绑定端口,封装程SocketAddress
    在这里插入图片描述
    在这里插入图片描述

    2、创建初始化Channel,将NioServerSocketChannel绑定到Boss
    NioEventLoopGroup中的EventLoop中的Selector上,指定Selector监听事件为accept

    在这里插入图片描述

    2.1 反射创建NioServerSocketChannel

    在这里插入图片描述
    在这里插入图片描述
    在这里插入图片描述
    在这里插入图片描述
    在这里插入图片描述

    在这里插入图片描述

    给ServerSocketChannel绑定id(唯一标识),NioMessageUnsafe(channel数据读写类),ChannelPipeline(channel业务处理管道,可以设置许多ChannelHandler进行编解码,业务处理)

    在这里插入图片描述
    1、感兴趣的事件是 0 指客户端建立连接
    2、ServerSocketChannel开启非堵塞模式,阻塞模式会怎么样?试想下,如果服务器通道阻塞,一个客户端SocketChannel与ServerSocketChannel建立连接,进行相关操作,其它的客户端怎么办?通道被占用了啊,当然这是我的理解

    2.2 初始化NioServerSocketChannel

    在这里插入图片描述

    @Override
        void init(Channel channel) {
            setChannelOptions(channel, newOptionsArray(), logger);
            setAttributes(channel, newAttributesArray());
    
            ChannelPipeline p = channel.pipeline();
    
            // 获取workerGroup
            final EventLoopGroup currentChildGroup = childGroup;
            /**
             * ServerBootStrap通过childHandler添加的childHandler,最终会在下面的ServerBootstrapAcceptor中添加进pipeline
             */
            final ChannelHandler currentChildHandler = childHandler;
            final Entry<ChannelOption<?>, Object>[] currentChildOptions = newOptionsArray(childOptions);
            final Entry<AttributeKey<?>, Object>[] currentChildAttrs = newAttributesArray(childAttrs);
    
            /**
             * 向pipeline中添加一个channelHandler
             * ChannelInitializer对象也是一个ChannelHandler
             * TODO 这里向pipeline里添加了一个ChannelHandler,会在哪里被触发呢?
             * 答:这里的channelHandler对象会在DefaultChannelPipeline#callHandlerAddedForAllHandlers方法里执行
             * 需要注意的是,这里是在ChannelInitializer抽象类的handlerAdded里调用了initChannel的抽象方法,从而调进
             * 此处的匿名对象类方法里。
             *
             */
            p.addLast(new ChannelInitializer<Channel>() {
                @Override
                public void initChannel(final Channel ch) {
                    final ChannelPipeline pipeline = ch.pipeline();
    
                    // TODO 这里是ServerBootStrap中设置的handler
                    ChannelHandler handler = config.handler();
    
                    /**
                     * 将ServerBootStrap在初始化的时候添加的ChannelHandler在此处真正的添加进pipeline中(在ServerTest中ServerBootStrap#childHandler添加的)
                     */
                    if (handler != null) {
                        pipeline.addLast(handler);
                    }
    
    
                    /**
                     * 拿出NioServerSocketChannel绑定的NioEventLoop来执行以下线程
                     *
                     * TODO 这里addLast进来的ServerBootstrapAccetor是用于workerGroup注册客户端用的!
                     * TODO 这里的pipeline扮演这很重要的作用。
                     */
                    ch.eventLoop().execute(new Runnable() {
                        @Override
                        public void run() {
                            pipeline.addLast(new ServerBootstrapAcceptor(
                                    ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
                        }
                    });
                }
            });
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57

    ServerSocketChannel初始化本质上就是为绑定的ChannelPipeline(链表)设置ChannelInitializer(ChannelHandler)。这个ChannelInitializer 主要作用:
    1、为ServerSocketChannel, 设置 Channelhandler(自己配置的)
    2、通过NioServerSocketChannel绑定的NioEventLoop来执行以下线程任务
    ServerBootstrapAccetor是用于workerGroup注册客户端用的! 这个功能后面debug展示

    2.3、ServerSocketChannel注册到NioEventLoop中的Selector中

    在这里插入图片描述

    在这里插入图片描述
    在这里插入图片描述

    就是从BossEventLoopGroup中拿一个NioEventLoop出来,将ServerSocketChannel注册到NioEventLoop上
    在这里插入图片描述
    在这里插入图片描述

    获取到SocketChannel在创建的时候创建的NioMessageUnsafe类,进行注册

    在这里插入图片描述
    通过NioEvetLoop 进行 一个通道注册任务,执行任务

    private void execute(Runnable task, boolean immediate) {
    
            // 判断当前线程是在NioEventLoop线程内,还是在外部线程
            boolean inEventLoop = inEventLoop();
            /**
             * 这里添加的Task是一个注册NioServerSocketChannel的任务!
             * 是AbstractChannel$AbstractUnsafe的一个匿名Runnable类
             */
            addTask(task);
    
            // 如果是外部线程调用的
            if (!inEventLoop) {
                /**
                 * 这里启动一个线程,就是NioEventLoop!!启动的是NioEventLoop
                 */
                startThread();
                if (isShutdown()) {
                    boolean reject = false;
                    try {
                        if (removeTask(task)) {
                            reject = true;
                        }
                    } catch (UnsupportedOperationException e) {
                        // The task queue does not support removal so the best thing we can do is to just move on and
                        // hope we will be able to pick-up the task before its completely terminated.
                        // In worst case we will log on termination.
                    }
                    if (reject) {
                        reject();
                    }
                }
            }
    
            if (!addTaskWakesUp && immediate) {
               
                 */
                wakeup(inEventLoop);
            }
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    /**
         * 核心工作就是启动NioEventLoop线程
         */
        private void doStartThread() {
            assert thread == null;
    
            /**
             * 1. 这里executor是ThreadExecutorMap类中的一个匿名Executor内部类,是Executor apply()这个方法返回的实例对象
             * 2. 这里传入的execute()里面的匿名内部类是SingleThreadEventExecutor中的,所以
             *      是SingleThreadEventExecutor$对象
             * 3. 执行的runnable是FastThreadLocalRunnable对象
             */
            executor.execute(new Runnable() {
                @Override
                public void run() {
                    thread = Thread.currentThread();
                    if (interrupted) {
                        thread.interrupt();
                    }
    
                    boolean success = false;
                    updateLastExecutionTime();
                    try {
                        // 核心,就是启动NioEventLoop!!!
                        SingleThreadEventExecutor.this.run();
                        success = true;
                    } catch (Throwable t) {
                        logger.warn("Unexpected exception from an event executor: ", t);
                    } finally {
                        for (;;) {
                            int oldState = state;
                            if (oldState >= ST_SHUTTING_DOWN || STATE_UPDATER.compareAndSet(
                                    SingleThreadEventExecutor.this, oldState, ST_SHUTTING_DOWN)) {
                                break;
                            }
                        }
    
                        // Check if confirmShutdown() was called at the end of the loop.
                        if (success && gracefulShutdownStartTime == 0) {
                            if (logger.isErrorEnabled()) {
                                logger.error("Buggy " + EventExecutor.class.getSimpleName() + " implementation; " +
                                        SingleThreadEventExecutor.class.getSimpleName() + ".confirmShutdown() must " +
                                        "be called before run() implementation terminates.");
                            }
                        }
    
                        try {
                            // Run all remaining tasks and shutdown hooks. At this point the event loop
                            // is in ST_SHUTTING_DOWN state still accepting tasks which is needed for
                            // graceful shutdown with quietPeriod.
                            for (;;) {
                                if (confirmShutdown()) {
                                    break;
                                }
                            }
    
                            // Now we want to make sure no more tasks can be added from this point. This is
                            // achieved by switching the state. Any new tasks beyond this point will be rejected.
                            for (;;) {
                                int oldState = state;
                                if (oldState >= ST_SHUTDOWN || STATE_UPDATER.compareAndSet(
                                        SingleThreadEventExecutor.this, oldState, ST_SHUTDOWN)) {
                                    break;
                                }
                            }
    
                            // We have the final set of tasks in the queue now, no more can be added, run all remaining.
                            // No need to loop here, this is the final pass.
                            confirmShutdown();
                        } finally {
                            try {
                                cleanup();
                            } finally {
                                // Lets remove all FastThreadLocals for the Thread as we are about to terminate and notify
                                // the future. The user may block on the future and once it unblocks the JVM may terminate
                                // and start unloading classes.
                                // See https://github.com/netty/netty/issues/6596.
                                FastThreadLocal.removeAll();
    
                                STATE_UPDATER.set(SingleThreadEventExecutor.this, ST_TERMINATED);
                                threadLock.countDown();
                                int numUserTasks = drainTasks();
                                if (numUserTasks > 0 && logger.isWarnEnabled()) {
                                    logger.warn("An event executor terminated with " +
                                            "non-empty task queue (" + numUserTasks + ')');
                                }
                                terminationFuture.setSuccess(null);
                            }
                        }
                    }
                }
            });
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89
    • 90
    • 91
    • 92
    • 93

    在这里插入图片描述
    在这里插入图片描述

    1、判断当前线程是NioEventLoop内部线程还是外部线程(由于是主线程启动,所以是外部线程)
    2、将通道注册任务放入NioEventLoop中任务队列中
    3、包装任务,通过NioEventLoop,主要是为了从NioEventLoop的run方法开始执行
    3、NioEventLoop通过ThreadFacoty创建线程,线程开始执行NioEventLoop run方法

     /**
         * NIOEventLoop执行核心
         */
        @Override
        protected void run() {
            int selectCnt = 0;      // 阻塞选择次数
    		// 从NioEventLoop中的 taskQueue中 判断是否存在事件
            for (;;) {      // 轮训注册到selector的IO事件           为什么for(;;)比while(1)好?因为for(;;)底层的指令更少,效率更高
                try {
                    int strategy;   // strategy = 0 default
                    try {
                        strategy = selectStrategy.calculateStrategy(selectNowSupplier, hasTasks());     // 获取策略。如果有任务则使用非阻塞方式
                        switch (strategy) {
                        case SelectStrategy.CONTINUE:
                            continue;
    
                        case SelectStrategy.BUSY_WAIT:
                            // fall-through to SELECT since the busy-wait is not supported with NIO
    
                        case SelectStrategy.SELECT:         // select事件执行
                            long curDeadlineNanos = nextScheduledTaskDeadlineNanos();       // 当前截止时间
    
                            if (curDeadlineNanos == -1L) {      // 表明没有定时任务
                                curDeadlineNanos = NONE; // nothing on the calendar
                            }
                            nextWakeupNanos.set(curDeadlineNanos);
                            try {
                                if (!hasTasks()) {      // 如果没有任务,则select阻塞等待任务     任务存放在SingleThreadEventLoop
                                    // TODO 测试
                                    System.err.println("[CurrentThread = " + Thread.currentThread().getName() + "]I'm selecting... waiting for selectKey or tasks!");
                                    strategy = select(curDeadlineNanos);
                                }
                            } finally {
                                // This update is just to help block unnecessary selector wakeups
                                // so use of lazySet is ok (no race condition)
                                // 标记未唤醒状态
                                nextWakeupNanos.lazySet(AWAKE);
                            }
                            // fall through
                        default:
                        }
                    } catch (IOException e) {
                        // If we receive an IOException here its because the Selector is messed up. Let's rebuild
                        // the selector and retry. https://github.com/netty/netty/issues/8566
                        rebuildSelector0();
                        selectCnt = 0;
                        handleLoopException(e);
                        continue;
                    }
    
                    System.err.println("[CurrentThread = " + Thread.currentThread().getName() + "] select() 调用完了,此时已经有事件进来了?");
                    selectCnt++;    // 选择次数+1
                    cancelledKeys = 0;
                    needsToSelectAgain = false;
    
                    final int ioRatio = this.ioRatio;       // 这里的ioRatio默认是50
                    boolean ranTasks;
                    if (ioRatio == 100) {
                        try {
                            if (strategy > 0) {
                                processSelectedKeys();      // 处理选择key,处理io相关的逻辑
                            }
                        } finally {
                            ranTasks = runAllTasks();   // 处理外部线程扔到taskQueue里的任务,这里的taskQueue是一个mpscQueue
                        }
                    } else if (strategy > 0) {
                        final long ioStartTime = System.nanoTime();     // 计算处理选择key的时间
                        try {
                            processSelectedKeys();
                        } finally {
                            // Ensure we always run tasks.
                            final long ioTime = System.nanoTime() - ioStartTime;
                            ranTasks = runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
                        }
                    } else {
                        /**
                         * 在Netty中,有两种任务,普通任务和定时任务。在执行任务的时候,会把定时任务队列里的task扔进普通任务队列里,
                         * 这里的普通任务队列就是mpscQueue,接着就挨个执行mpscQueue里的任务。
                         *
                         * 任务:普通任务 、定时任务
                         * 队列:普通任务队列mpscQueue 、 定时任务队列
                         *
                         */
                        ranTasks = runAllTasks(0); // This will run the minimum number of tasks
                    }
    
                    if (ranTasks || strategy > 0) {
                        if (selectCnt > MIN_PREMATURE_SELECTOR_RETURNS && logger.isDebugEnabled()) {
                            logger.debug("Selector.select() returned prematurely {} times in a row for Selector {}.",
                                    selectCnt - 1, selector);
                        }
                        selectCnt = 0;
                    } else if (unexpectedSelectorWakeup(selectCnt)) { // Unexpected wakeup (unusual case)       解决空轮训Bug,重置selectCnt,重新生成selector
                        selectCnt = 0;
                    }
                } catch (CancelledKeyException e) {
                    // Harmless exception - log anyway
                    if (logger.isDebugEnabled()) {
                        logger.debug(CancelledKeyException.class.getSimpleName() + " raised by a Selector {} - JDK bug?",
                                selector, e);
                    }
                } catch (Error e) {
                    throw (Error) e;
                } catch (Throwable t) {
                    handleLoopException(t);
                } finally {
                    // Always handle shutdown even if the loop processing threw an exception.
                    try {
                        if (isShuttingDown()) {
                            closeAll();
                            if (confirmShutdown()) {
                                return;
                            }
                        }
                    } catch (Error e) {
                        throw (Error) e;
                    } catch (Throwable t) {
                        handleLoopException(t);
                    }
                }
            }
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89
    • 90
    • 91
    • 92
    • 93
    • 94
    • 95
    • 96
    • 97
    • 98
    • 99
    • 100
    • 101
    • 102
    • 103
    • 104
    • 105
    • 106
    • 107
    • 108
    • 109
    • 110
    • 111
    • 112
    • 113
    • 114
    • 115
    • 116
    • 117
    • 118
    • 119
    • 120
    • 121
    • 122

    就是NioEventLoop中的线程进行死循环处理事件,通过事件进行驱动处理

    1、判断NioEventLoop中的任务队列中是否存在任务 (存在通道注册任务哦!!),从而定义处理策略

     strategy = selectStrategy.calculateStrategy(selectNowSupplier, hasTasks());     // 获取策略。如果有任务则使用非阻塞方式
    
    • 1

    在这里插入图片描述
    有任务,那么 通过 selector 获取select 值,selector如果获取这个值呢?没看懂
    在这里插入图片描述

    2、拿到的select值为0,好像是没有SelectedKey,那么执行runTask,从任务队列中获取 之前的通道注册事件

    protected boolean runAllTasks(long timeoutNanos) {
    
            // 从定时任务队列中把任务聚合到普通队列里
            fetchFromScheduledTaskQueue();
    
    
            // 从普通任务队列里拿任务
            Runnable task = pollTask();
            if (task == null) {
                afterRunningAllTasks();
                return false;
            }
    
            final long deadline = timeoutNanos > 0 ? ScheduledFutureTask.nanoTime() + timeoutNanos : 0;
            long runTasks = 0;
            long lastExecutionTime;
            for (;;) {
    
                // 真正执行了任务
                safeExecute(task);
                taskTimes++;
    
                runTasks ++;
    
                // Check timeout every 64 tasks because nanoTime() is relatively expensive.
                // XXX: Hard-coded value - will make it configurable if it is really a problem.
                // 当累积到64个任务的时候,这里判断是因为任务的执行是比较耗时的
                if ((runTasks & 0x3F) == 0) {
                    lastExecutionTime = ScheduledFutureTask.nanoTime();
    
                    // 如果当前时间 >= 截止时间,即已经超过了截止时间了
                    if (lastExecutionTime >= deadline) {
                        break;
                    }
                }
    
                // 将任务从任务队列中弹出
                task = pollTask();
                if (task == null) {
                    lastExecutionTime = ScheduledFutureTask.nanoTime();
                    break;
                }
            }
    
            afterRunningAllTasks();
            this.lastExecutionTime = lastExecutionTime;
            return true;
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48

    在这里插入图片描述
    任务真正被执行了哦!
    在这里插入图片描述

    @Override
        protected void doRegister() throws Exception {
            boolean selected = false;
            for (;;) {
                try {
                    /**
                     * 调用selectableChannel的register方法,用于在给定的NioEventLoop上的selector上注册这个通道channel,并返回一个选择键
                     *
                     * OP_READ = 1 << 0                 读操作位
                     * OP_WRITE = 1 << 2                写操作位
                     * OP_CONNECT = 1 << 3              客户端连接到服务端操作位
                     * OP_ACCEPT = 1 << 4               服务端接受客户端链接操作位
                     *
                     * 此处调用了jdk nio的selectableChannel的register方法,传入的操作位是0,表明对任何事件都不感兴趣,仅仅是完成注册操作。
                     *
                     * 向selector注册channel成功后,会返回一个selectionKey,后续可以拿着这个selectionKey获取到channel。
                     *
                     * javaChannel()拿到的是:AbstractSelectableChannel,在其register方法里,会调用addKey,给selectionKey添加默认数组大小3
                     * 并最终调用jdk底层register
                     *
                     */
                    selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this);
                    return;
                } catch (CancelledKeyException e) {
                    if (!selected) {
                        // Force the Selector to select now as the "canceled" SelectionKey may still be
                        // cached and not removed because no Select.select(..) operation was called yet.
                        /**
                         * 由于未调用select#select(),因此可能仍然在缓存,而未删除但是已经取消了的selectionKey,强制调用selectNow()
                         * 将selectionKey从Selector上删除
                         */
                        eventLoop().selectNow();
                        selected = true;
                    } else {
                        // We forced a select operation on the selector before but the SelectionKey is still cached
                        // for whatever reason. JDK bug ?
                        throw e;
                    }
                }
            }
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41

    拿到ServerSocketChannel,注册到NioEventLoop中的Selector中 ,注册成功返回一个SelectionKey 。 对应代码 selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this); 这个0表示通道注册

     @Override
        protected final SelectionKey register(AbstractSelectableChannel ch,
                                              int ops,
                                              Object attachment)
        {
            if (!(ch instanceof SelChImpl))
                throw new IllegalSelectorException();
            SelectionKeyImpl k = new SelectionKeyImpl((SelChImpl)ch, this);
            k.attach(attachment);
    
            // register (if needed) before adding to key set
            implRegister(k);
    
            // add to the selector's key set, removing it immediately if the selector
            // is closed. The key is not in the channel's key set at this point but
            // it may be observed by a thread iterating over the selector's key set.
            keys.add(k);
            try {
                k.interestOps(ops);
            } catch (ClosedSelectorException e) {
                assert ch.keyFor(this) == null;
                keys.remove(k);
                k.cancel();
                throw e;
            }
            return k;
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27

    1、将ServerSocketChannel,Selector 封装成 SelectionKeyImpl(所谓的通道注册凭证)
    2、将ski (SelectionKeyImpl)放入 selector newKeys 队列
    3、将ski 放入 selector的 SelectionKey集合中
    4、设置NioEvenLoop中当前这个Selector 感兴趣的事件/操作

    至此成功将ServerSocketChannel 放入 Selector中,并设置Selector所感兴趣的事件 0

    2.4、调用ServerSocketChannel中的pipeLine,进行AddHandler

    在这里插入图片描述
    在这里插入图片描述

    在这里插入图片描述

    在这里插入图片描述

    这里又会走到NioEventLoop中的run方法中,然后执行上图任务,整个流程走完

    问题:

    1、Selector如何进行Select
    在NioEventLoop,ServerSocketChannel创建完后,会将channel注册到Selector中并设置通道感兴趣的事件,这里是注册通道事件。NioEventLoop会开启一个线程死循环,run方法内部 Selector会通过 select()/selectNow() 以阻塞非阻塞方式等待感兴趣的事件。具体需要看Selector 选择的源码

    如果没有任务,select会阻塞
    在这里插入图片描述
    如果有任务,以非阻塞的方式执行

    在这里插入图片描述

    如果通道有IO事件,那么进行处理,内部会对感兴趣的客户端建立连接事件/通道注册 进行处理

    在这里插入图片描述

    总结

    在这里插入图片描述

  • 相关阅读:
    中端酒店迈入“30+”,维也纳酒店如何化解行业的三大难关
    Django框架之模板层template的一些介绍和使用
    【每日一题】补档 ABC309F - Box in Box | 三维偏序 | 树状数组 | 中等
    中秋快乐! Happy Mid-autumn Festival!
    什么是网络流量监控
    Nginx-HTTPS 配置
    【数字图像处理】RGB 转灰度图
    拓端tecdat|R语言使用K-Means聚类可视化WiFi访问
    RocketMQ特性--事务消息是个啥,咋发出去的呢?
    【办公类-16-07-04】合并版“2023下学期 中班户外游戏(有场地和无场地版,一周一次)”(python 排班表系列)
  • 原文地址:https://blog.csdn.net/qq_44787816/article/details/126838705