• SpringBoot 异步任务-Guava 中EventBus


    目录

    EventBus事件总线模式

    pom

    简单使用

    创建消息接受类

    测试类:

    输出结果:

    结论:

    结合Spring使用

    注入Bean

    MyEventListener

    消息基类

    创建监听

    发送消息

    项目启动/测试

    扩展

    多线程

    观察者模式和发布订阅模式的区别


    EventBus事件总线模式

    所谓Bus ,在计算机中就是存在主板上的总线,

    在计算机上,我们的输入/输出设备种类繁多,当我们从键盘输入一个字符串, cpu 处理完成之后回显给显示器;这个过程的传输最直接的方式就是为这些设备相互之间建立线路,这样可以解决问题, 但是随着设备越来越多,私有的线路越来越繁杂,很难以维护,于是我们暂时丢弃了这个方式,采用公共的总线方式,所有的设备将 传输的数据,以及目的设备地址发送出去,总线只是为我们进行运输,这样也很方便维护

    所以,软件中也借鉴了硬件中总线的背景,产生了事件总线模式,在软件中,组件模块会有很多,相互通信的协议,方式多种多样, 如果每个组件都要写一套,维护成本较高,那么我们就要用这个模式来解决这个问题.

    这个模式跟发布-订阅模式,观察者模式都很类似。但相比之下,EventBus解除了通知者与观察者之间的关系,是组件之间能够进一步的解耦, 是设计模式中的观察者模式的优雅实现

    传统上,Java的进程内事件分发都是通过发布者和订阅者之间的显式注册实现的。设计EventBus就是为了取代这种显示注册方式, 使组件间有了更好的解耦。使用上也非常简单

    1. 所涉及的类在eventbus 包下,主要包括:
    2. EventBus: 事件总线,以及消息的广播传输
    3. Subscriber: 订阅者,观察者
    4. Dispatcher: 事件分发,有3种实现,分别是ImmediateDispatcher(同步分发器),LegacyAsyncDispatcher(异步分发器),PerThreadQueuedDispatcher(单线程分发)
    5. SubscriberRegistry:订阅者注册器,通过ConcurrentMap<Class, CopyOnWriteArraySet> 维护了事件消息类型和 观察者之间的关系;
    6. SubscriberExceptionContext: 订阅者异常处理器上下文;
    7. SubscriberExceptionHandler:异常处理器;
    8. DeadEvent:死亡事件,事件没有相应的订阅者处理时,则标记为一个死亡事件;
    9. AsyncEventBus:异步事件总线;

    pom

    1. <!-- guava -->
    2. <dependency>
    3. <groupId>com.google.guava</groupId>
    4. <artifactId>guava</artifactId>
    5. <version>20.0</version>
    6. </dependency>

    简单使用

    创建消息接受类

    这里多创建几种不同类型的Listener消息接受类(便于理解)

    1. import com.google.common.eventbus.Subscribe;
    2. /**
    3. * @author szw
    4. * @version 1.0
    5. * @since 2022/10/20 19:54
    6. */
    7. public class EventListener1 {
    8. @Subscribe
    9. public void execute(String msg) {
    10. System.out.println("EventListener1 获得String消息" + msg);
    11. }
    12. }
    13. public class EventListener2 {
    14. @Subscribe
    15. public void execute(String msg){
    16. System.out.println("EventListener2 获得String消息" + msg);
    17. }
    18. }
    19. public class EventListener3 {
    20. @Subscribe
    21. public void execute(Integer msg) {
    22. System.out.println("EventListener3 获得Integer消息" + msg);
    23. }
    24. }
    25. public class EventListener4 {
    26. @Subscribe
    27. public void execute(EventEntity msg) {
    28. System.out.println("EventListener4 获得对象消息" + msg.toString());
    29. }
    30. }
    31. public class EventListener5 {
    32. @Subscribe
    33. public void execute(ChildrenEventEntity msg) {
    34. System.out.println("EventListener5 获得子类对象消息" + msg.toString());
    35. }
    36. }
    37. import java.io.Serializable;
    38. import lombok.Getter;
    39. import lombok.Setter;
    40. import lombok.ToString;
    41. /**
    42. * 消息封装类: 对象类型
    43. * @author szw
    44. * @version 1.0
    45. * @since 2022/10/21 09:33
    46. */
    47. @Getter
    48. @Setter
    49. @ToString
    50. public class EventEntity implements Serializable {
    51. private static final long serialVersionUID = 1L;
    52. private Integer id;
    53. private String name;
    54. public EventEntity(Integer id, String name) {
    55. this.id = id;
    56. this.name = name;
    57. }
    58. public EventEntity() {
    59. }
    60. }
    61. /**
    62. * 消息封装类: 对象类型, 测试子类
    63. * @author szw
    64. * @version 1.0
    65. * @since 2022/10/21 09:33
    66. */
    67. @Getter
    68. @Setter
    69. @ToString
    70. public class ChildrenEventEntity extends EventEntity {
    71. private static final long serialVersionUID = 1L;
    72. private String name2;
    73. public ChildrenEventEntity(String name2) {
    74. this.name2 = name2;
    75. }
    76. }

    测试类:

    1. import com.google.common.eventbus.EventBus;
    2. /**
    3. * @author szw
    4. * @version 1.0
    5. * @since 2022/10/20 19:57
    6. */
    7. public class Test {
    8. public static void main(String[] args) {
    9. // 实现事件总线
    10. EventBus eventBus = new EventBus();
    11. EventListener1 eventListener1 = new EventListener1();
    12. EventListener2 eventListener2 = new EventListener2();
    13. EventListener3 eventListener3 = new EventListener3();
    14. EventListener4 eventListener4 = new EventListener4();
    15. EventListener5 eventListener5 = new EventListener5();
    16. // 进行消息订阅
    17. eventBus.register(eventListener1);
    18. eventBus.register(eventListener2);
    19. eventBus.register(eventListener3);
    20. eventBus.register(eventListener4);
    21. eventBus.register(eventListener5);
    22. // 通知者发送消息
    23. eventBus.post("发送消息 啦啦啦.....");
    24. eventBus.post(1);
    25. eventBus.post(new EventEntity(100, "EventEntity父类对象消息"));
    26. eventBus.post(new ChildrenEventEntity("ChildrenEventEntity子类对象消息"));
    27. }
    28. }

    输出结果:

    EventListener1 获得String消息发送消息 啦啦啦.....
    EventListener2 获得String消息发送消息 啦啦啦.....
    EventListener3 获得Integer消息1
    EventListener4 获得对象消息EventEntity(id=100, name=EventEntity父类对象消息)
    EventListener5 获得子类对象消息ChildrenEventEntity(name2=ChildrenEventEntity子类对象消息)
    EventListener4 获得对象消息ChildrenEventEntity(name2=ChildrenEventEntity子类对象消息)

    结论:

    1. eventBus会根据Listener的参数类型的不同,分别向不同的Subscribe发送不同的消息。
    2. 参数类型可以是封装类
    3. 参数类型相同的Listener会同时接到消息
    4. 关于继承,发送父类消息,子类不会接到父类的消息,发送子类消息, 子类和父类都可以收到

    结合Spring使用

    直接new的话也太不Spring了, 结合Spring使用的话还是用注入的方式, 这里简单实现下

    注入Bean

    1. @Configuration
    2. @Slf4j
    3. public class SpringConfig {
    4. @Bean("myAsyncEventBus")
    5. @Lazy(value = true)
    6. public AsyncEventBus createAsyncEventBus(ThreadPoolTaskExecutor threadPool) {
    7. log.info("myAsyncEventBus=============");
    8. return new AsyncEventBus(threadPool);
    9. }
    10. }

    MyEventListener

    1. public abstract class MyEventListener implements InitializingBean {
    2. @Autowired
    3. private AsyncEventBus asyncEventBus;
    4. /**
    5. * 注册异步任务
    6. */
    7. @Override
    8. public void afterPropertiesSet() {
    9. asyncEventBus.register(this);
    10. }
    11. }

    消息基类

    1. public class TestEvent {
    2. }

    创建监听

    1. @Component
    2. public class TestEventListener extends MyEventListener {
    3. @Subscribe
    4. @AllowConcurrentEvents
    5. public void execute(TestEvent event) {
    6. System.out.println("收到消息");
    7. }
    8. }

    发送消息

    1. @Autowired
    2. private AsyncEventBus asyncEventBus;
    3. @ResponseBody
    4. @RequestMapping("/test")
    5. public void test() {
    6. asyncEventBus.post(new TestEvent());
    7. }

    项目启动/测试

    扩展

    多线程

    EventBus默认是线程安全的, 内部SynchronizedSubscriber在调用业务逻辑的时候,会使用synchronized块加锁,给了多线程但实际依旧是多线程抢占锁然后顺序执行

    如要使用多线程需要在订阅@Subscribe处加上@AllowConcurrentEvents注解, 注意加入此注解需要业务自己保证线程安全

    1. @Subscribe
    2. @AllowConcurrentEvents
    3. public void execute(Test1Event event) {

    观察者模式和发布订阅模式的区别

    两种模式都可以用于松散耦合,改进代码管理和潜在的复用。

    • 最大的区别是调度的地方:  观察者模式中主体和观察者是互相感知的,发布-订阅模式是借助第三方来实现调度的,发布者和订阅者之间互不感知
    • 从表面上看: 观察者模式里,只有两个角色一一 观察者+被观察者, 而发布订阅模式里,却不仅仅只有发布者和订阅者两个角色,还有一个经常被我们忽略的一一经纪人Broker
    • 往更深层次讲: 观察者和被观察者,是松耦合的关系, 发布者和订阅者,则完全不存在耦合
    • 从使用层面上讲: 观察者模式,多用于单个应用内部, 发布订阅模式,则更多的是一种跨应用的模式(cross-application pattern),比如消息中间件

  • 相关阅读:
    【CSAPP】现代操作系统前几章
    202206 集合 面试
    Linux、docker、kubernetes、MySql、Shell运维快餐
    openmp 通用核心 学习 2 数据环境—任务-内存模型
    数字电路学习
    心跳包
    库存预占架构升级方案设计-交易库存中心
    鸿蒙开发(五)鸿蒙UI开发概览
    SeriLog日志框架的应用
    CSP-S2019 Day2
  • 原文地址:https://blog.csdn.net/qq_44695727/article/details/127433495