目录
所谓Bus ,在计算机中就是存在主板上的总线,
在计算机上,我们的输入/输出设备种类繁多,当我们从键盘输入一个字符串, cpu 处理完成之后回显给显示器;这个过程的传输最直接的方式就是为这些设备相互之间建立线路,这样可以解决问题, 但是随着设备越来越多,私有的线路越来越繁杂,很难以维护,于是我们暂时丢弃了这个方式,采用公共的总线方式,所有的设备将 传输的数据,以及目的设备地址发送出去,总线只是为我们进行运输,这样也很方便维护
所以,软件中也借鉴了硬件中总线的背景,产生了事件总线模式,在软件中,组件模块会有很多,相互通信的协议,方式多种多样, 如果每个组件都要写一套,维护成本较高,那么我们就要用这个模式来解决这个问题.
这个模式跟发布-订阅模式,观察者模式都很类似。但相比之下,EventBus解除了通知者与观察者之间的关系,是组件之间能够进一步的解耦, 是设计模式中的观察者模式的优雅实现
传统上,Java的进程内事件分发都是通过发布者和订阅者之间的显式注册实现的。设计EventBus就是为了取代这种显示注册方式, 使组件间有了更好的解耦。使用上也非常简单
- 所涉及的类在eventbus 包下,主要包括:
- EventBus: 事件总线,以及消息的广播传输
- Subscriber: 订阅者,观察者
- Dispatcher: 事件分发,有3种实现,分别是ImmediateDispatcher(同步分发器),LegacyAsyncDispatcher(异步分发器),PerThreadQueuedDispatcher(单线程分发)
- SubscriberRegistry:订阅者注册器,通过ConcurrentMap<Class>, CopyOnWriteArraySet
> 维护了事件消息类型和 观察者之间的关系; - SubscriberExceptionContext: 订阅者异常处理器上下文;
- SubscriberExceptionHandler:异常处理器;
- DeadEvent:死亡事件,事件没有相应的订阅者处理时,则标记为一个死亡事件;
- AsyncEventBus:异步事件总线;
- <!-- guava -->
- <dependency>
- <groupId>com.google.guava</groupId>
- <artifactId>guava</artifactId>
- <version>20.0</version>
- </dependency>
这里多创建几种不同类型的Listener消息接受类(便于理解)
- import com.google.common.eventbus.Subscribe;
-
- /**
- * @author szw
- * @version 1.0
- * @since 2022/10/20 19:54
- */
- public class EventListener1 {
-
- @Subscribe
- public void execute(String msg) {
- System.out.println("EventListener1 获得String消息" + msg);
- }
- }
-
- public class EventListener2 {
-
- @Subscribe
- public void execute(String msg){
- System.out.println("EventListener2 获得String消息" + msg);
- }
- }
-
- public class EventListener3 {
-
- @Subscribe
- public void execute(Integer msg) {
- System.out.println("EventListener3 获得Integer消息" + msg);
- }
- }
-
- public class EventListener4 {
-
- @Subscribe
- public void execute(EventEntity msg) {
- System.out.println("EventListener4 获得对象消息" + msg.toString());
- }
- }
-
- public class EventListener5 {
-
- @Subscribe
- public void execute(ChildrenEventEntity msg) {
- System.out.println("EventListener5 获得子类对象消息" + msg.toString());
- }
- }
-
-
-
- import java.io.Serializable;
-
- import lombok.Getter;
- import lombok.Setter;
- import lombok.ToString;
-
- /**
- * 消息封装类: 对象类型
- * @author szw
- * @version 1.0
- * @since 2022/10/21 09:33
- */
- @Getter
- @Setter
- @ToString
- public class EventEntity implements Serializable {
- private static final long serialVersionUID = 1L;
-
- private Integer id;
-
- private String name;
-
- public EventEntity(Integer id, String name) {
- this.id = id;
- this.name = name;
- }
-
- public EventEntity() {
- }
- }
-
-
- /**
- * 消息封装类: 对象类型, 测试子类
- * @author szw
- * @version 1.0
- * @since 2022/10/21 09:33
- */
- @Getter
- @Setter
- @ToString
- public class ChildrenEventEntity extends EventEntity {
- private static final long serialVersionUID = 1L;
-
- private String name2;
-
- public ChildrenEventEntity(String name2) {
- this.name2 = name2;
- }
- }
-
- import com.google.common.eventbus.EventBus;
-
- /**
- * @author szw
- * @version 1.0
- * @since 2022/10/20 19:57
- */
- public class Test {
-
- public static void main(String[] args) {
- // 实现事件总线
- EventBus eventBus = new EventBus();
-
- EventListener1 eventListener1 = new EventListener1();
- EventListener2 eventListener2 = new EventListener2();
- EventListener3 eventListener3 = new EventListener3();
- EventListener4 eventListener4 = new EventListener4();
- EventListener5 eventListener5 = new EventListener5();
-
- // 进行消息订阅
- eventBus.register(eventListener1);
- eventBus.register(eventListener2);
- eventBus.register(eventListener3);
- eventBus.register(eventListener4);
- eventBus.register(eventListener5);
-
- // 通知者发送消息
- eventBus.post("发送消息 啦啦啦.....");
- eventBus.post(1);
- eventBus.post(new EventEntity(100, "EventEntity父类对象消息"));
- eventBus.post(new ChildrenEventEntity("ChildrenEventEntity子类对象消息"));
- }
- }
EventListener1 获得String消息发送消息 啦啦啦.....
EventListener2 获得String消息发送消息 啦啦啦.....
EventListener3 获得Integer消息1
EventListener4 获得对象消息EventEntity(id=100, name=EventEntity父类对象消息)
EventListener5 获得子类对象消息ChildrenEventEntity(name2=ChildrenEventEntity子类对象消息)
EventListener4 获得对象消息ChildrenEventEntity(name2=ChildrenEventEntity子类对象消息)
直接new的话也太不Spring了, 结合Spring使用的话还是用注入的方式, 这里简单实现下
- @Configuration
- @Slf4j
- public class SpringConfig {
-
- @Bean("myAsyncEventBus")
- @Lazy(value = true)
- public AsyncEventBus createAsyncEventBus(ThreadPoolTaskExecutor threadPool) {
- log.info("myAsyncEventBus=============");
- return new AsyncEventBus(threadPool);
- }
- }
- public abstract class MyEventListener implements InitializingBean {
-
- @Autowired
- private AsyncEventBus asyncEventBus;
-
- /**
- * 注册异步任务
- */
- @Override
- public void afterPropertiesSet() {
- asyncEventBus.register(this);
- }
- }
- public class TestEvent {
-
- }
- @Component
- public class TestEventListener extends MyEventListener {
-
- @Subscribe
- @AllowConcurrentEvents
- public void execute(TestEvent event) {
- System.out.println("收到消息");
- }
- }
- @Autowired
- private AsyncEventBus asyncEventBus;
-
- @ResponseBody
- @RequestMapping("/test")
- public void test() {
- asyncEventBus.post(new TestEvent());
- }

EventBus默认是线程安全的, 内部SynchronizedSubscriber在调用业务逻辑的时候,会使用synchronized块加锁,给了多线程但实际依旧是多线程抢占锁然后顺序执行
如要使用多线程需要在订阅@Subscribe处加上@AllowConcurrentEvents注解, 注意加入此注解需要业务自己保证线程安全
- @Subscribe
- @AllowConcurrentEvents
- public void execute(Test1Event event) {
两种模式都可以用于松散耦合,改进代码管理和潜在的复用。