• 5、Nacos服务注册服务端源码分析(四)之NotifyCenter


    上篇我们讲server端处理服务注册源码时,遇到了一个关键类NotifyCenter,本篇就主要来分析下这个类。

    NotifyCenter

    这个类所在包:nacos-common

    /**
     * Unified Event Notify Center.
     */
    
    • 1
    • 2
    • 3

    通过类注释可以看出来这个类是一个统一的事件通知中心,那也就足可见这个类的重要性了。

    public class NotifyCenter
    
    • 1

    通过类定义发现这个类就是一个普通类,不会被注册为spring bean且类中的方法都是静态方法,也就是说这个类是整个进程共享的了,所以其中的内部变量必然要是线程安全的,结合源码看看是如何处理的。

    private static final Logger LOGGER = LoggerFactory.getLogger(NotifyCenter.class);
    //这两个变量是在静态块中初始化的,所以不需要特殊处理
    public static int ringBufferSize;
    public static int shareBufferSize;
    
    private static final AtomicBoolean CLOSED = new AtomicBoolean(false);
    //初始化EventPublisher的工厂
    private static final EventPublisherFactory DEFAULT_PUBLISHER_FACTORY;
    
    private static final NotifyCenter INSTANCE = new NotifyCenter();
    
    private DefaultSharePublisher sharePublisher;
    //SPI机制获取到EventPublisher实现之后,获取class,赋值到此
    private static Class<? extends EventPublisher> clazz;
    
    /**
     * Publisher management container.
     */
    private final Map<String, EventPublisher> publisherMap = new ConcurrentHashMap<>(16);
    
    static {
        // Internal ArrayBlockingQueue buffer size. For applications with high write throughput,
        // this value needs to be increased appropriately. default value is 16384
        String ringBufferSizeProperty = "nacos.core.notify.ring-buffer-size";
        ringBufferSize = Integer.getInteger(ringBufferSizeProperty, 16384);
        
        // The size of the public publisher's message staging queue buffer
        String shareBufferSizeProperty = "nacos.core.notify.share-buffer-size";
        shareBufferSize = Integer.getInteger(shareBufferSizeProperty, 1024);
        //采用SPI拓展机制,NacosServiceLoader内部封装了JDK的ServiceLoader,在此基础上加了一层缓存
        final Collection<EventPublisher> publishers = NacosServiceLoader.load(EventPublisher.class);
        Iterator<EventPublisher> iterator = publishers.iterator();
        if (iterator.hasNext()) {
            clazz = iterator.next().getClass();
        } else {
        	//默认使用DefaultPublisher
            clazz = DefaultPublisher.class;
        }
        
        DEFAULT_PUBLISHER_FACTORY = (cls, buffer) -> {
            try {
            	//实例化EventPublisher
                EventPublisher publisher = clazz.newInstance();
                //调用EventPublisher初始化方法
                publisher.init(cls, buffer);
                return publisher;
            } catch (Throwable ex) {
                LOGGER.error("Service class newInstance has error : ", ex);
                throw new NacosRuntimeException(SERVER_ERROR, ex);
            }
        };
        
        try {
            
            // Create and init DefaultSharePublisher instance.
            INSTANCE.sharePublisher = new DefaultSharePublisher();
            INSTANCE.sharePublisher.init(SlowEvent.class, shareBufferSize);
            
        } catch (Throwable ex) {
            LOGGER.error("Service class newInstance has error : ", ex);
        }
        //JVM销毁前执行的钩子函数 NotifyCenter::shutdown
        ThreadUtils.addShutdownHook(NotifyCenter::shutdown);
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64

    查看代码可以知道关键的处理逻辑是:创建EventPublisher


    EventPublisher

    以上涉及到两个EventPublisher一个是默认的DefaultPublisher,另一个是DefaultSharePublisher
    在这里插入图片描述
    我们来一个个梳理下UML中的类:

    interface EventPublisher

    事件通知的接口,定义了事件通知的一系列方法

    /**
      * Initializes the event publisher.
      * 初始化
      * @param type       {@link Event >}
      * @param bufferSize 消息队列大小
      */
     void init(Class<? extends Event> type, int bufferSize);
     
     /**
      * The number of currently staged events.
      * 当前暂存的事件数量
      * @return event size
      */
     long currentEventSize();
     
     /**
      * Add listener.
      *	添加订阅者
      * @param subscriber {@link Subscriber}
      */
     void addSubscriber(Subscriber subscriber);
     
     /**
      * Remove listener.
      * 移除订阅者
      * @param subscriber {@link Subscriber}
      */
     void removeSubscriber(Subscriber subscriber);
     
     /**
      * publish event.
      *	发布事件
      * @param event {@link Event}
      * @return publish event is success
      */
     boolean publish(Event event);
     
     /**
      * Notify listener.
      * 通知订阅者
      * @param subscriber {@link Subscriber}
      * @param event      {@link Event}
      */
     void notifySubscriber(Subscriber subscriber, Event event);
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    class DefaultPublisher
    public class DefaultPublisher extends Thread implements EventPublisher
    
    • 1

    DefaultPublisher 是事件发布接口EventPublisher的默认实现,除此之外,还继承了Thread,那么也就意味着DefaultPublisher除了有事件处理能力,本身还是一个线程。

    先看接口中定义的初始化方法init是如何实现的,从这里看起会很好的帮助我们理解这个类:

    @Override
    public void init(Class<? extends Event> type, int bufferSize) {
    	//设置为守护线程
        setDaemon(true);
        //设置线程名称,一定程度上这可以帮助我们排查问题
        setName("nacos.publisher-" + type.getName());
        //Event类型
        this.eventType = type;
        //队列长度
        this.queueMaxSize = bufferSize;
        //队列
        this.queue = new ArrayBlockingQueue<>(bufferSize);
        //Thread的start()方法
        start();
    }
    //全局变量,用来标识当前Publisher是否初始化完成
    private volatile boolean initialized = false;
    @Override
    public synchronized void start() {
        if (!initialized) {
            // start just called once
            super.start();
            if (queueMaxSize == -1) {
                queueMaxSize = ringBufferSize;
            }
            initialized = true;
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28

    即然继承了Thread那么就肯定回去重写关键的run()方法,我们看下是如何重写的:

    @Override
    public void run() {
        openEventHandler();
    }
    
    void openEventHandler() {
        try {
            
            // This variable is defined to resolve the problem which message overstock in the queue.
            // 这个变量用于解决队列中消息积压的问题
            int waitTimes = 60;
            // To ensure that messages are not lost, enable EventHandler when
            // waiting for the first Subscriber to register
            // 没有关闭 && 没有订阅者 && waitTimes>0 --> 休眠1s,最多可以sleep 60s
            while (!shutdown && !hasSubscriber() && waitTimes > 0) {
                ThreadUtils.sleep(1000L);
                waitTimes--;
            }
    
            while (!shutdown) {
            	//如果没有获取到事件,这里会阻塞
                final Event event = queue.take();
                //处理事件
                receiveEvent(event);
                UPDATER.compareAndSet(this, lastEventSequence, Math.max(lastEventSequence, event.sequence()));
            }
        } catch (Throwable ex) {
            LOGGER.error("Event listener exception : ", ex);
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30

    可以看到,这里有一个死循环,只要没有收到shutdown事件,那么就会一直尝试从队列中获取事件去处理。这里获取事件的逻辑是从阻塞队列中取数据,所以当没有时间要处理时,当前线程会阻塞在获取事件的逻辑中,不会对cpu有过多的压力。

    /**
     * Receive and notifySubscriber to process the event.
     *
     * @param event {@link Event}.
     */
    void receiveEvent(Event event) {
        final long currentEventSequence = event.sequence();
        
        if (!hasSubscriber()) {
            LOGGER.warn("[NotifyCenter] the {} is lost, because there is no subscriber.", event);
            return;
        }
        
        // Notification single event listener
        for (Subscriber subscriber : subscribers) {
            if (!subscriber.scopeMatches(event)) {
                continue;
            }
            
            // Whether to ignore expiration events
            if (subscriber.ignoreExpireEvent() && lastEventSequence > currentEventSequence) {
                LOGGER.debug("[NotifyCenter] the {} is unacceptable to this subscriber, because had expire",
                        event.getClass());
                continue;
            }
            
            // Because unifying smartSubscriber and subscriber, so here need to think of compatibility.
            // Remove original judge part of codes.
            notifySubscriber(subscriber, event);
        }
    }
    
    @Override
    public void notifySubscriber(final Subscriber subscriber, final Event event) {
        LOGGER.debug("[NotifyCenter] the {} will received by {}", event, subscriber);
        //将订阅者对事件的处理构建成一个Runnable
        final Runnable job = () -> subscriber.onEvent(event);
        //获取订阅者处理事件的Executer,这里的设计逻辑是:将异步或者同步的决定权交给订阅者本身,
        //是否异步处理决定于订阅者对executor()方法的实现方式
        final Executor executor = subscriber.executor();
        
        if (executor != null) {
            executor.execute(job);
        } else {
            try {
            	//如果没有executor,那就直接执行Runnable
                job.run();
            } catch (Throwable e) {
                LOGGER.error("Event callback exception: ", e);
            }
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52

    看了这么多代码,简单对DefaultPublisher做个总结:

    • 是什么?
      • 事件通知的默认实现,用于处理事件通知逻辑
      • 继承了线程
    • 有什么?
      • 订阅者集合
      • 事件集合
    • 做什么?
      • 处理事件通知,同时处理订阅者的相关逻辑(添加/删除)
    • 怎么做?
      • 初始化方法中会启动线程
      • run()处理事件,从自己的事件集合中获取事件,然后交给订阅者去处理
    class DefaultSharePublisher

    这个类继承了DefaultPublisher,设计的目的应该是处理一些比较耗时的事件。本身有一个map做缓存,没有什么特别的逻辑。

    private final Map<Class<? extends SlowEvent>, Set<Subscriber>> subMappings = new ConcurrentHashMap<>();
    
    • 1

    这个类去处理事件的逻辑还是其父类DefaultPublishernotifySubscriber(subscriber, event)方法。我们上文已经看过这个方法了。


    回过头再看NotifyCenter

    看完EventPublisher之后,别忘记我们为什么出发🫣。
    回过头再看NotifyCenter:我们从接收一个事件开始,从头梳理代码。

    接收事件
    /**
     * Request publisher publish event Publishers load lazily, calling publisher. Start () only when the event is
     * actually published.
     *
     * @param event class Instances of the event.
     */
    public static boolean publishEvent(final Event event) {
        try {
            return publishEvent(event.getClass(), event);
        } catch (Throwable ex) {
            LOGGER.error("There was an exception to the message publishing : ", ex);
            return false;
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    发布事件
    /**
     * Request publisher publish event Publishers load lazily, calling publisher.
     *
     * @param eventType class Instances type of the event type.
     * @param event     event instance.
     */
    private static boolean publishEvent(final Class<? extends Event> eventType, final Event event) {
        if (ClassUtils.isAssignableFrom(SlowEvent.class, eventType)) {
        	//当前事件如果是SlowEvent,那么交与sharePublisher处理
            return INSTANCE.sharePublisher.publish(event);
        }
        //获取Event的类名
        final String topic = ClassUtils.getCanonicalName(eventType);
        
        //根据事件的类名,从map中获取EventPublisher去处理当前事件
        EventPublisher publisher = INSTANCE.publisherMap.get(topic);
        if (publisher != null) {
            return publisher.publish(event);
        }
        if (event.isPluginEvent()) {
            return true;
        }
        LOGGER.warn("There are no [{}] publishers for this event, please register", topic);
        return false;
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25

    ps:Class.getCanonicalName()的用法

    EventPublisher是如何添加到publisherMap中的?
    public static void registerSubscriber(final Subscriber consumer) {
        registerSubscriber(consumer, DEFAULT_PUBLISHER_FACTORY);
    }
    public static void registerSubscriber(final Subscriber consumer, final EventPublisherFactory factory) {
        // If you want to listen to multiple events, you do it separately,
        // based on subclass's subscribeTypes method return list, it can register to publisher.
        //SmartSubscriber是为了处理一个订阅者订阅多个事件而设计的
        if (consumer instanceof SmartSubscriber) {
            for (Class<? extends Event> subscribeType : ((SmartSubscriber) consumer).subscribeTypes()) {
                // For case, producer: defaultSharePublisher -> consumer: smartSubscriber.
                if (ClassUtils.isAssignableFrom(SlowEvent.class, subscribeType)) {
                    INSTANCE.sharePublisher.addSubscriber(consumer, subscribeType);
                } else {
                    // For case, producer: defaultPublisher -> consumer: subscriber.
                    addSubscriber(consumer, subscribeType, factory);
                }
            }
            return;
        }
        
        final Class<? extends Event> subscribeType = consumer.subscribeType();
        if (ClassUtils.isAssignableFrom(SlowEvent.class, subscribeType)) {
            INSTANCE.sharePublisher.addSubscriber(consumer, subscribeType);
            return;
        }
        //添加订阅者
        addSubscriber(consumer, subscribeType, factory);
    }
    
    private static void addSubscriber(final Subscriber consumer, Class<? extends Event> subscribeType,
            EventPublisherFactory factory) {
        
        final String topic = ClassUtils.getCanonicalName(subscribeType);
        synchronized (NotifyCenter.class) {
            // MapUtils.computeIfAbsent is a unsafe method.
            MapUtil.computeIfAbsent(INSTANCE.publisherMap, topic, factory, subscribeType, ringBufferSize);
        }
        EventPublisher publisher = INSTANCE.publisherMap.get(topic);
        if (publisher instanceof ShardedEventPublisher) {
            ((ShardedEventPublisher) publisher).addSubscriber(consumer, subscribeType);
        } else {
            publisher.addSubscriber(consumer);
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44

    最后的一个方法中我们看到了Map publisherMap的数据添加逻辑:根据Event的类名做为key,EventPublisher作为value。
    同时我们也看到了事件订阅者是如何和事件发布者建立关系的:publisher.addSubscriber(consumer);,publisher中有一个集合,用于存储事件的订阅者。

    总结

    好了,总结下NotifyCenter:

    • 是什么?
      • 统一的事件处理类,所有的事件通知都借助此类去处理
      • 组织事件-发布者的关联关系
    • 有什么?
      • 事件处理类EventPublisher
      • 共享事件处理类DefaultSharePublisher,用于处理耗时的事件
      • Map publisherMap
    • 做什么?
      • 注册发布者到publisherMap
      • 接收事件之后交与对应的订阅者处理
    • 怎么做?
      • 根据Event的类名去publisherMap 中获取对应的EventPublisher,然后发布事件,交与对应的订阅者处理事件

    本篇主要讲了NotifyCenter相关的事件代码逻辑,特别对 EventPublisher做了详细的分析。我们从中看到了一个事件发布与订阅解耦的设计方式,很多框架中都会使用事件驱动去设计发布-订阅模型,日常工作中如果遇到类似的情景,也可以尝试使用这种方式去实现。

  • 相关阅读:
    gcc生成shared library及可执行文件
    DataV兼容vue3的方法
    ESP-IDF学习——1.环境安装与hello-world
    WPS pdf文档合并收费?Linux平台采用pdfunite实现pdf文档合并
    使用docker快速搭建jenkins
    golang学习笔记——日志记录
    cocos creator 小游戏允许他人访问本地项目
    【案例直击】看快速开发框架如何为医疗行业数字化转型赋能?
    化工机械基础复习要点
    SpringBoot3快速入门
  • 原文地址:https://blog.csdn.net/Paranoia_ZK/article/details/133158895