• 7、Nacos服务注册服务端源码分析(六)


    本文收录于专栏 Nacos 中 。


    前言

    上文我们结束了客户端注册中,涉及到的Event逻辑。我们发现事件流转之后,程序走到了一个任务执行引擎NacosDelayTaskExecuteEngine中。

    继续查看后续源码之前,我们做一个回顾,梳理下NamingSubscriberServicecV2ImplServiceChangedEvent被订阅之后的流程:

    1. 当前事件被组装成PushDelayTask添加到PushDelayTaskExecuteEngine中的ConcurrentHashMap tasks中。
    2. NacosDelayTaskExecuteEngine中维护的延时线程池ScheduledExecutorService会定时扫描tasks,然后交由PushDelayTaskProcessor1处理。

    PushDelayTaskProcessor处理流程如下:

    @Override
    public boolean process(NacosTask task) {
        PushDelayTask pushDelayTask = (PushDelayTask) task;
        Service service = pushDelayTask.getService();
        NamingExecuteTaskDispatcher.getInstance()
                .dispatchAndExecuteTask(service, new PushExecuteTask(service, executeEngine, pushDelayTask));
        return true;
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8

    从上文中我们可以看到几个关键类,PushDelayTaskExecuteEngineNacosDelayTaskExecuteEngineNamingExecuteTaskDispatcherPushExecuteTask。接下来我们逐个分析下这几个类,消化理解其中的设计和逻辑。

    一、Nacos的任务设计中有哪些关键类?

    首先,我们整体梳理下Nacos中任务处理的几个关键类,了解顶层代码设计之后,后续接触其不同分支实现时,也就可以做到在整体性上的认知是准确的🙆。

    定义任务:NacosTask

    /**
     * Nacos task.
     *
     * @author xiweng.yy
     */
    public interface NacosTask {
        
        /**
         * Judge Whether this nacos task should do.
         *
         * @return true means the nacos task should be done, otherwise false
         */
        boolean shouldProcess();
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14

    顶层接口中只定义了一个方法shouldProcess(),用于判断当前任务是否需要被执行。

    执行任务:NacosTaskProcessor

    /**
     * Task processor.
     *
     * @author Nacos
     */
    public interface NacosTaskProcessor {
        
        /**
         * Process task.
         *
         * @param task     task.
         * @return process task result.
         */
        boolean process(NacosTask task);
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15

    任务执行的顶层接口中也只有一个方法,入参是NacosTask,出参是一个代表是当前任务处理成功与否的布尔值。

    执行引擎:NacosTaskExecuteEngine

    public interface NacosTaskExecuteEngine<T extends NacosTask> extends Closeable {
        
        /**
         * Get Task size in execute engine.
         * 获取当前引擎中需要执行任务的数量,也就是NacosTask的数量
         * @return size of task
         */
        int size();
        
        /**
         * Whether the execute engine is empty.
         * 判断当前引擎中是否还有需要执行的NacosTask
         * @return true if the execute engine has no task to do, otherwise false
         */
        boolean isEmpty();
        
        /**
         * Add task processor {@link NacosTaskProcessor} for execute engine.
         * 添加一个任务执行类
         * @param key           key of task
         * @param taskProcessor task processor
         */
        void addProcessor(Object key, NacosTaskProcessor taskProcessor);
        
        /**
         * Remove task processor {@link NacosTaskProcessor} form execute engine for key.
         *
         * @param key key of task
         */
        void removeProcessor(Object key);
        
        /**
         * Try to get {@link NacosTaskProcessor} by key, if non-exist, will return default processor.
         *
         * @param key key of task
         * @return task processor for task key or default processor if task processor for task key non-exist
         */
        NacosTaskProcessor getProcessor(Object key);
        
        /**
         * Get all processor key.
         *
         * @return collection of processors
         */
        Collection<Object> getAllProcessorKey();
        
        /**
         * Set default task processor. If do not find task processor by task key, use this default processor to process
         * task.
         *
         * @param defaultTaskProcessor default task processor
         */
        void setDefaultTaskProcessor(NacosTaskProcessor defaultTaskProcessor);
        
        /**
         * Add task into execute pool.
         *
         * @param key  key of task
         * @param task task
         */
        void addTask(Object key, T task);
        
        /**
         * Remove task.
         *
         * @param key key of task
         * @return nacos task
         */
        T removeTask(Object key);
        
        /**
         * Get all task keys.
         *
         * @return collection of task keys.
         */
        Collection<Object> getAllTaskKeys();
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77

    通过代码我们可以看出执行引擎的作用就是组织当前类型的任务,然后组织任务(NacosTask)任务执行者(NacosTaskProcessor) 的关联关系。

    二、PushDelayTaskExecuteEngine、NacosExecuteTaskExecuteEngine

    在这里插入图片描述
    我们先看顶层接口NacosTaskExecuteEngine的抽象实现AbstractNacosTaskExecuteEngine,我们上边已经贴过接口NacosTaskExecuteEngine的代码,知道里其中的接口逻辑主要是面对NacosTaskNacosTaskProcessor的。那么这里我们主要看下这个抽象模版类实现的私有变量即可:

    //组织NacosTask和NacosTaskProcessor的关联关系
    private final ConcurrentHashMap<Object, NacosTaskProcessor> taskProcessors = new ConcurrentHashMap<>();
    
    private NacosTaskProcessor defaultTaskProcessor;
    
    • 1
    • 2
    • 3
    • 4

    它有两个私有变量,分别是ConcurrentHashMap类型的缓存,存放的是任务处理类接口NacosTaskProcessor,还有一个默认的任务处理类defaultTaskProcessor,在缓存中没有查找到对应处理类时,使用这个默认处理类去处理。

    NacosDelayTaskExecuteEngine

    这个类一看就是去延迟处理执行任务的引擎,我们看下代码是如何设计的:

    private final ScheduledExecutorService processingExecutor;
    
    protected final ConcurrentHashMap<Object, AbstractDelayTask> tasks;
    
    public NacosDelayTaskExecuteEngine(String name, int initCapacity, Logger logger, long processInterval) {
        super(logger);
        //待处理任务的缓存
        tasks = new ConcurrentHashMap<>(initCapacity);
        //一个单线程的处理任务线程池
        processingExecutor = ExecutorFactory.newSingleScheduledExecutorService(new NameThreadFactory(name));
        //开启线程池,间隔processInterval时间,执行ProcessRunnable
        processingExecutor
                .scheduleWithFixedDelay(new ProcessRunnable(), processInterval, processInterval, TimeUnit.MILLISECONDS);
    }
    
    //ProcessRunnable的设计就是实现Runnable,重写run方法,处理task
    private class ProcessRunnable implements Runnable {
         @Override
         public void run() {
             try {
                 processTasks();
             } catch (Throwable e) {
                 getEngineLog().error(e.toString(), e);
             }
         }
     }
    
    /**
     * process tasks in execute engine.
     */
    protected void processTasks() {
    	//从缓存中获取所有待处理的task
        Collection<Object> keys = getAllTaskKeys();
        for (Object taskKey : keys) {
        	//先从缓存中移除这个task,因为默认这个task接下来会被处理掉
            AbstractDelayTask task = removeTask(taskKey);
            if (null == task) {
                continue;
            }
            //从父类方法中获取需要处理当前task的任务处理类
            NacosTaskProcessor processor = getProcessor(taskKey);
            if (null == processor) {
                getEngineLog().error("processor not found for task, so discarded. " + task);
                continue;
            }
            try {
                // ReAdd task if process failed
                if (!processor.process(task)) {
                	//如果任务处理失败,那么将任务重新添加到缓存中
                    task.setLastProcessTime(System.currentTimeMillis());
            		addTask(key, task);
                }
            } catch (Throwable e) {
                getEngineLog().error("Nacos task execute error ", e);
                retryFailedTask(taskKey, task);
            }
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58

    总结:通过定义一个延时执行的线程池定时去扫描task缓存,执行任务

    PushDelayTaskExecuteEngine

    public PushDelayTaskExecuteEngine(ClientManager clientManager, ClientServiceIndexesManager indexesManager,
                                      ServiceStorage serviceStorage, NamingMetadataManager metadataManager,
                                      PushExecutor pushExecutor, SwitchDomain switchDomain) {
        super(PushDelayTaskExecuteEngine.class.getSimpleName(), Loggers.PUSH);
        this.clientManager = clientManager;
        this.indexesManager = indexesManager;
        this.serviceStorage = serviceStorage;
        this.metadataManager = metadataManager;
        this.pushExecutor = pushExecutor;
        this.switchDomain = switchDomain;
        //设置默认的任务处理器
        setDefaultTaskProcessor(new PushDelayTaskProcessor(this));
    }
    
    private static class PushDelayTaskProcessor implements NacosTaskProcessor {
         
         private final PushDelayTaskExecuteEngine executeEngine;
         
         public PushDelayTaskProcessor(PushDelayTaskExecuteEngine executeEngine) {
             this.executeEngine = executeEngine;
         }
         
         @Override
         public boolean process(NacosTask task) {
             PushDelayTask pushDelayTask = (PushDelayTask) task;
             Service service = pushDelayTask.getService();
             NamingExecuteTaskDispatcher.getInstance()
                     .dispatchAndExecuteTask(service, new PushExecuteTask(service, executeEngine, pushDelayTask));
             return true;
         }
     }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31

    设置了默认的任务处理器PushDelayTaskProcessor,那么看到这里我们就回到了我们前言中开头说到的环节了。

    三、NamingExecuteTaskDispatcher

    NamingExecuteTaskDispatcher.getInstance()很明显就是一个单例的写法了,看下dispatchAndExecuteTask()的逻辑:

    private static final NamingExecuteTaskDispatcher INSTANCE = new NamingExecuteTaskDispatcher();
    private final NacosExecuteTaskExecuteEngine executeEngine;
    
    //executeEngine是在构造方法中实例化的
    private NamingExecuteTaskDispatcher() {
        executeEngine = new NacosExecuteTaskExecuteEngine(EnvUtil.FUNCTION_MODE_NAMING, Loggers.SRV_LOG);
    }
    
    public void dispatchAndExecuteTask(Object dispatchTag, AbstractExecuteTask task) {
        executeEngine.addTask(dispatchTag, task);
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11

    我们这里又遇到一个新的执行引擎NacosExecuteTaskExecuteEngine,它有什么特点呢?

    private final TaskExecuteWorker[] executeWorkers;
    
    • 1

    它只有这么一个私有变量,TaskExecuteWorker其实就是一个自定义之后的线程,内部封住了处理任务的一些逻辑,这里不做展开。

    四、PushExecuteTask

    我们先从PushExecuteTask这个类看起:
    在这里插入图片描述

    NacosTask

    /**
     * Judge Whether this nacos task should do.
     *
     * @return true means the nacos task should be done, otherwise false
     */
    boolean shouldProcess();
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6

    这个接口中只定义了一个方法shouldProcess(),作用是判断这个任务是否需要被处理。
    那也就意味着,Nacos中不是所有的task都会被处理的,但截止到目前我们还没有遇见这种task。
    在这里插入图片描述
    我们可以看到NacosTask有着诸多实现,每一种都对shouldProcess()方法有着不同的实现。

    AbstractExecuteTask

    /**
     * Abstract task which should be executed immediately.
     */
    public abstract class AbstractExecuteTask implements NacosTask, Runnable {
        protected static final long INTERVAL = 3000L;
        
        @Override
        public boolean shouldProcess() {
            return true;
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11

    这是一个抽象类,默认这种task是需要被处理的。除此之外,这个抽象类还实现了Runnable2,那就要着重关注其子类重写的run()方法了。

    PushExecuteTask

    回到PushExecuteTask,其父类实现了Runnable,那我们便先看本类的run()方法入手。

    @Override
    public void run() {
        try {
        	//包装推送数据
            PushDataWrapper wrapper = generatePushData();
            ClientManager clientManager = delayTaskEngine.getClientManager();
            for (String each : getTargetClientIds()) {
                Client client = clientManager.getClient(each);
                if (null == client) {
                    // means this client has disconnect
                    continue;
                }
                //获取订阅者
                Subscriber subscriber = client.getSubscriber(service);
                // skip if null
                if (subscriber == null) {
                    continue;
                }
                //执行推送逻辑
                delayTaskEngine.getPushExecutor().doPushWithCallback(each, subscriber, wrapper,
                        new ServicePushCallback(each, subscriber, wrapper.getOriginalData(), delayTask.isPushToAll()));
            }
        } catch (Exception e) {
            Loggers.PUSH.error("Push task for service" + service.getGroupedServiceName() + " execute failed ", e);
            delayTaskEngine.addTask(service, new PushDelayTask(service, 1000L));
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27

    因为我们目前还没有查看客户端订阅相关逻辑,所以暂时不再继续查看和客户端订阅相关的代码。


    总结

    NamingSubscriberServiceV2Impl

    1. 当前事件被组装成PushDelayTask添加到PushDelayTaskExecuteEngine中的ConcurrentHashMap tasks中。
    2. NacosDelayTaskExecuteEngine中维护的延时线程池 ScheduledExecutorService会定时扫描tasks,然后交由PushDelayTaskProcessor1处理。

    PushDelayTaskProcessor

    1. 将当前PushDelayTask封装成PushExecuteTask
    2. 封装后的task,交由NamingExecuteTaskDispatcher做分发
    3. NamingExecuteTaskDispatcher会将任务交由执行引擎NacosExecuteTaskExecuteEngine 执行
    4. NacosExecuteTaskExecuteEngine内部封装了线程实现类TaskExecuteWorker去执行任务
    5. TaskExecuteWorker中维护了一个任务队列,其run()方法会扫描队列中的任务Runnable task = queue.take(),然后执行task.run()

    我们发现,一个简单的任务执行,在分发过程中经过了很多类的处理,在梳理源码的过程中我们要学习Nacos中对事件和任务的封装,加深对低耦合的理解。

    一个客户端注册事件,梳理到这里其实都是数据分发的逻辑,接下来我们马上就要看到数据处理的逻辑了。
    查看上述总结,相信你也知道我们接下来摇去看哪个关键类了。


    1. NacosTaskProcessor接口是任务处理代码的顶层接口,只有一个处理NacosTask的方法。从这里可以看出Nacos中关于解耦的工作是做了良好的设计的。 ↩︎ ↩︎

    2. 抽象类实现接口有什么意义? ↩︎

  • 相关阅读:
    【Spring Cloud】Spring Cloud Oauth2 + Gateway 微服务权限管理方案
    Qt不规则可移动窗体的实现
    美国阿贡国家实验室发布快速自动扫描套件 FAST,助力显微技术「快速阅读」成为可能
    图解2022年城市人口迁移趋势
    【构建ML驱动的应用程序】第 4 章 :获取初始数据集
    37 深度学习(一):查看自己显卡的指令|张量|验证集|分类问题|回归问题
    k8s教程(15)-pod之亲和性与互斥性调度
    vue的路由懒加载
    2021-10《信息资源管理 02378》真卷(独家文字版),圈定章节考点+统计真题分布
    Java中线程的状态
  • 原文地址:https://blog.csdn.net/Paranoia_ZK/article/details/133356315