• Android应用内组件通讯之EventBus源码分析之post流程(三)


    上一篇文章对EventBus中的初始化以及注册流程梳理了一遍,因为内容较多,所以把post流程单独分开来跟; 之前在实现Demo的时候知道了事件触发的流程是post(Object event)与postSticky(Object event)两个方法,那么这里就有三个疑问了:

    • 这个两个方法最终是怎么触发各模块的消费函数执行的?
    • 在消息分发过程中,又是如何切换线程执行的?
    • 粘性事件的分发流程与普通事件有什么不一样呢?

    带着这三个疑问,一起来跟下源码实现吧。

    提交普通事件

    普通事件的提交使用post函数完成,post函数定义在EventBus.java中,源码如下:

      public void post(Object event) {
      		//线程副本中存放的当前post运行所在线程的信息
            PostingThreadState postingState = currentPostingThreadState.get();
            //将事件提交到当前线程中的事件集合中
            List<Object> eventQueue = postingState.eventQueue;
            eventQueue.add(event);
    
            if (!postingState.isPosting) {
            	//当前post函数执行是否为主UI线程
                postingState.isMainThread = isMainThread();
                postingState.isPosting = true;
                if (postingState.canceled) {
                    throw new EventBusException("Internal error. Abort state was not reset");
                }
                try {
                	//当前线程中的事件队列,全部分发,直到集合为空
                    while (!eventQueue.isEmpty()) {
                        postSingleEvent(eventQueue.remove(0), postingState);
                    }
                } finally {
                	//事件全部推完后,重置状态
                    postingState.isPosting = false;
                    postingState.isMainThread = false;
                }
            }
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26

    继续跟postSingleEvent函数

     private void postSingleEvent(Object event, PostingThreadState postingState) throws Error {
     		//获取事件势力的Class类
            Class<?> eventClass = event.getClass();
            boolean subscriptionFound = false;
            //Builder中配置的是否支持事件继承,默认是true
            if (eventInheritance) {
            	//去递归查找该事件类型的所有父类Class文件,也包括interface
            	//如果之前已经有存过,则直接使用缓存的数据
                List<Class<?>> eventTypes = lookupAllEventTypes(eventClass);
                int countTypes = eventTypes.size();
                //遍历触发事件分发
                for (int h = 0; h < countTypes; h++) {
                    Class<?> clazz = eventTypes.get(h);
                    subscriptionFound |= postSingleEventForEventType(event, postingState, clazz);
                }
            } else {
            	//不支持事件继承,则只要分发当前事件类型
                subscriptionFound = postSingleEventForEventType(event, postingState, eventClass);
            }
            
            //没有该事件类型的订阅模块
            if (!subscriptionFound) {
                if (logNoSubscriberMessages) {
                    logger.log(Level.FINE, "No subscribers registered for event " + eventClass);
                }
                if (sendNoSubscriberEvent && eventClass != NoSubscriberEvent.class &&
                        eventClass != SubscriberExceptionEvent.class) {
                    post(new NoSubscriberEvent(this, event));
                }
            }
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31

    上面的函数作用是实现了区分是否支持事件继承feature的实现,支持事件继承,则分发事件的时候,需要将该事件分发包含其父类,以及事件所有实现接口的订阅模块的消费函数中。如果不支持该feature,则只需分发到该订阅该事件的消费函数中。这个说有点太拗口了,举个例子吧

    interface MetaEvent
    open class ParentMeta(v0:Int, v1:Int) : MetaEvent
    class ChildMeta(v0 : Int) : ParentMeta(v0, 0)
    
    定义如上事件类型,定义消费函数如下
    fun eventConsumer(event:MetaEvent){
        Log.i("xx_debug", "eventConsume: $event")
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8

    支持事件继承feature的时候,此时发送ParentMeta,ChildMeta事件时,消费函数都能收到,当不支持该feature时,事件只能定义为具体的类型了

    fun eventConsumer(event:ParentMeta){
        Log.i("xx_debug", "eventConsume: $event")
    }
    
    • 1
    • 2
    • 3

    这样我们只能发ParentMeta对象,消费函数才能收到

    ======================================================
    好了继续往下走,才是真的消息分发逻辑,看函数postSingleEventForEventType实现

    private boolean postSingleEventForEventType(Object event, PostingThreadState postingState, Class<?> eventClass) {
    		//首先获取所有的订阅器,这个Subscription我们在上一篇分析过,
    		//是在register函数中创建的,里面包含了订阅模块的实例对象,通过
    		//事件类型保存在HashMap中,提高了查找速度
            CopyOnWriteArrayList<Subscription> subscriptions;
            synchronized (this) {
                subscriptions = subscriptionsByEventType.get(eventClass);
            }
            if (subscriptions != null && !subscriptions.isEmpty()) {
            	//遍历所有的订阅器,这里的订阅器都订阅了我们需要通知的
            	//事件类型
                for (Subscription subscription : subscriptions) {
                    postingState.event = event;
                    postingState.subscription = subscription;
                    boolean aborted;
                    try {
                    	//开始广播
                        postToSubscription(subscription, event, postingState.isMainThread);
                        aborted = postingState.canceled;
                    } finally {
                        postingState.event = null;
                        postingState.subscription = null;
                        postingState.canceled = false;
                    }
                    if (aborted) {
                        break;
                    }
                }
                return true;
            }
            return false;
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32

    上面的函数比较简单,基本上实现的功能就是找到所有订阅该事件的订阅器,然后通过postToSubscription来广播,继续往下走:

     private void postToSubscription(Subscription subscription, Object event, boolean isMainThread) {
     		//根据消费函数中标记的执行线程类型来处理事件
            switch (subscription.subscriberMethod.threadMode) {
                case POSTING:
                    invokeSubscriber(subscription, event);
                    break;
                case MAIN:
                    if (isMainThread) {
                        invokeSubscriber(subscription, event);
                    } else {
                        mainThreadPoster.enqueue(subscription, event);
                    }
                    break;
                case MAIN_ORDERED:
                    if (mainThreadPoster != null) {
                        mainThreadPoster.enqueue(subscription, event);
                    } else {
                        // temporary: technically not correct as poster not decoupled from subscriber
                        invokeSubscriber(subscription, event);
                    }
                    break;
                case BACKGROUND:
                    if (isMainThread) {
                        backgroundPoster.enqueue(subscription, event);
                    } else {
                        invokeSubscriber(subscription, event);
                    }
                    break;
                case ASYNC:
                    asyncPoster.enqueue(subscription, event);
                    break;
                default:
                    throw new IllegalStateException("Unknown thread mode: " + subscription.subscriberMethod.threadMode);
            }
        }
    
    //通过反射的方式,调用订阅模块对象中的消费函数
      void invokeSubscriber(Subscription subscription, Object event) {
            try {
                subscription.subscriberMethod.method.invoke(subscription.subscriber, event);
            } catch (InvocationTargetException e) {
                handleSubscriberException(subscription, event, e.getCause());
            } catch (IllegalAccessException e) {
                throw new IllegalStateException("Unexpected exception", e);
            }
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46

    通过上面两个函数,我们基本上就弄清楚了第一个流程:post最终如何触发订阅模块的消费函数的。 即在invokeSubscriber方法中,通过反射的方式,来调用订阅模块的对象中的消费函数。传入的参数就是事件的对象。

    第二个问题在postToSubscription中似乎也能找到答案了,事件分发是如何切换线程的,回顾一下之前在Demo中的消费函数定义

      @Subscribe(threadMode = ThreadMode.BACKGROUND)
      fun eventConsumer(event:ParentMeta){
         Log.i("xx_debug", "eventConsume: $event")
      }
    
    • 1
    • 2
    • 3
    • 4

    可以通过注解中的threadMode中的值来确认最终函数运行在什么线程中。看下ThreadMode的定义:

    public enum ThreadMode {
        /**
         * Subscriber will be called directly in the same thread, which is posting the event. This is the default. Event delivery
         * implies the least overhead because it avoids thread switching completely. Thus this is the recommended mode for
         * simple tasks that are known to complete in a very short time without requiring the main thread. Event handlers
         * using this mode must return quickly to avoid blocking the posting thread, which may be the main thread.
         */
        POSTING,
    
        /**
         * On Android, subscriber will be called in Android's main thread (UI thread). If the posting thread is
         * the main thread, subscriber methods will be called directly, blocking the posting thread. Otherwise the event
         * is queued for delivery (non-blocking). Subscribers using this mode must return quickly to avoid blocking the main thread.
         * If not on Android, behaves the same as {@link #POSTING}.
         */
        MAIN,
    
        /**
         * On Android, subscriber will be called in Android's main thread (UI thread). Different from {@link #MAIN},
         * the event will always be queued for delivery. This ensures that the post call is non-blocking.
         */
        MAIN_ORDERED,
    
        /**
         * On Android, subscriber will be called in a background thread. If posting thread is not the main thread, subscriber methods
         * will be called directly in the posting thread. If the posting thread is the main thread, EventBus uses a single
         * background thread, that will deliver all its events sequentially. Subscribers using this mode should try to
         * return quickly to avoid blocking the background thread. If not on Android, always uses a background thread.
         */
        BACKGROUND,
    
        /**
         * Subscriber will be called in a separate thread. This is always independent from the posting thread and the
         * main thread. Posting events never wait for subscriber methods using this mode. Subscriber methods should
         * use this mode if their execution might take some time, e.g. for network access. Avoid triggering a large number
         * of long running asynchronous subscriber methods at the same time to limit the number of concurrent threads. EventBus
         * uses a thread pool to efficiently reuse threads from completed asynchronous subscriber notifications.
         */
        ASYNC
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40

    对照英文注释,用简单一点的话来概括各个线程模式的定义:
    ThreadMode.POSTING: post运行在哪个线程,消费函数的执行就在哪个线程。
    ThreadMode.MAIN: 不管post方法运行在什么线程中,消费函数的执行一直在主UI线程中
    ThreadMode.MAIN_ORDERED: 同ThreadMode.MAIN, 区别就是消息会加入主UI线程的Handler队列中,等主UI线程中的其他消息执行完毕后再出发消费函数的调用
    ThreadMode.BACKGROUND:post如果运行在子线程中,则消费函数的执行也是在该子线程中;post如果运行在主UI线程,则新起一个线程去执行消费函数的调用
    ThreadMode.ASYNC: post与消费函数的执行均在不同的子线程中完成。

    好了,了解了这些线程模式的定义,我们再回到postToSubscription就一目了然了,最后再说一下三个提交器,分别是:

    • AsyncPoster : 使用线程池执行队列任务
    • BackgroundPoster: 使用线程池执行队列任务
    • HandlerPoster :使用Handler来处理队列任务

    提交器的逻辑就比较简单了,不再分析。到这里基本上就理解了第二个问题,消息提交时的线程切换。 还有第三个问题,粘性事件的提交与普通事件的提交有什么不一样,看源码,postSticky(Object event)

    synchronized (stickyEvents) {
         stickyEvents.put(event.getClass(), event);
    }
    // Should be posted after it is putted, in case the subscriber wants to remove immediately
    post(event);
    
    • 1
    • 2
    • 3
    • 4
    • 5

    哈哈,很简单,跟普通提交的唯一区别就是把事件根据其Class类型保存其对象的引用到了HashMap中。当订阅粘性事件时,如果有对应事件类型的引用,则使用postToSubscription给当前的订阅器广播事件。

    好了,到这里基本上大流程都跟万了,剩下的解注册,取消事件发布等都只是修改属性状态或者删除数据结构中的子集,比较简单,就不再分析了。

    分析了这么多,那EventBus跟我们自己写一个订阅者模式的代码有什么不一样呢?我认为优点如下:

    1. 支持广播任意数据类型的事件
    2. 支持任意类型的对象去订阅事件,无需另外定义回调
    3. 支持粘性事件
    4. 支持发布过程中的线程切换

    总结两个字就是灵活

    缺点当前是有的:那就是容易因为使用不当造成内存泄漏,所以在使用过程中要特别注意,register要与unregister配合使用,粘性事件不用时记得取消广播哟。

  • 相关阅读:
    【玩转Redhat Linux 8.0系列 | 从命令行管理文件(一)】
    UML图系列之序列图
    docker 部署 node.js(express) 服务
    费时3个月,靠着这篇软件测试进阶笔记,成功拿下了阿里、腾讯等10家offer
    精通Git(一)——入门
    激光雷达检测负障碍物(附大概 C++ 代码)
    Vue工程化
    MEMM最大熵模型
    MySQL的join你真的了解吗!!!
    【Spring Boot】Day01
  • 原文地址:https://blog.csdn.net/zjfengdou30/article/details/126155315