• Spring Boot事件机制浅析


    1、概述

    在设计模式中,观察者模式是一个比较常用的设计模式。维基百科解释如下:

     观察者模式是软件设计模式的一种。在此种模式中,一个目标对象管理所有相依于它的观察者对象,并且在它本身的状态改变时主动发出通知。这通常透过呼叫各观察者所提供的方法来实现。

    在我们日常业务开发中,观察者模式对我们很大的一个作用,在于实现业务的解耦、传参等。以用户注册的场景来举例子,假设在用户注册完成时,需要给该用户发送邮件、发送优惠劵等等操作,如下图所示:

    图片

    图片

    • UserService 在完成自身的用户注册逻辑之后,仅仅只需要发布一个 UserRegisterEvent 事件,而无需关注其它拓展逻辑。

    • 其它 Service 可以自己订阅UserRegisterEvent 事件,实现自定义的拓展逻辑。

    注意:发布订阅模式属于广义上的观察者模式

    在观察者模式中,观察者需要直接订阅目标事件;在目标发出内容改变的事件后,直接接收事件并作出响应

    1. ╭─────────────╮ Fire Event ╭──────────────╮
    2. │ │─────────────>│ │
    3. │ Subject │ │ Observer │
    4. │ │<─────────────│ │
    5. ╰─────────────╯ Subscribe ╰──────────────╯

    在发布订阅模式中,发布者和订阅者之间多了一个发布通道;一方面从发布者接收事件,另一方面向订阅者发布事件;订阅者需要从事件通道订阅事件,以此避免发布者和订阅者之间产生依赖关系

    1. ╭─────────────╮ ╭───────────────╮ Fire Event ╭──────────────╮
    2. │ │ Publish Event │ │───────────────>│ │
    3. │ Publisher │────────────────>│ Event Channel │ │ Subscriber │
    4. │ │ │ │<───────────────│ │
    5. ╰─────────────╯ ╰───────────────╯ Subscribe ╰──────────────╯

    简单来说,发布订阅模式属于广义上的观察者模式,在观察者模式的 Subject 和 Observer 的基础上,引入 Event Channel 这个中介,进一步解耦。

    2、事件模式中的概念

    • 事件源:事件的触发者,比如注册用户信息,入库,发布“用户XX注册成功”。

    • 事件:描述发生了什么事情的对象,比如:XX注册成功的事件

    • 事件监听器:监听到事件发生的时候,做一些处理,比如 注册成功后发送邮件、赠送积分、发优惠券…

    3、spring事件使用步骤

    • 定义事件

      自定义事件,需要继承ApplicationEvent类,实现自定义事件。另外,通过它的 source 属性可以获取事件源,timestamp 属性可以获得发生时间。

    • 定义监听器

      自定义事件监听器,需要实现ApplicationListener接口,实现onApplicationEvent方法,处理感兴趣的事件

    • 创建事件广播器

      创建事件广播器实现ApplicationEventMulticaster接口,也可以使用spring定义好的SimpleApplicationEventMulticaster:

      ApplicationEventMulticaster applicationEventMulticaster = new SimpleApplicationEventMulticaster();
    • 向广播器中注册事件监听器

      将事件监听器注册到广播器ApplicationEventMulticaster中,

      applicationEventMulticaster.addApplicationListener(new SendEmailOnOrderCreaterListener());
    • 通过广播器发布事件

      广播事件,调用ApplicationEventMulticaster#multicastEvent方法广播事件,此时广播器中对这个事件感兴趣的监听器会处理这个事件。

      applicationEventMulticaster.multicastEvent(new OrderCreateEvent(applicationEventMulticaster, 1L));

    4、使用方式

    4.1 面向接口的方式

    案例:实现用户注册成功后发布事件,然后在监听器中发送邮件的功能。

    用户注册事件:

    创建 UserRegisterEvent事件类,继承 ApplicationEvent 类,用户注册事件。代码如下:

    1. public class UserRegistryEvent extends ApplicationEvent {
    2. private String userName;
    3. public UserRegistryEvent(Object source, String userName) {
    4. super(source);
    5. this.userName = userName;
    6. }
    7. public String getUserName() {
    8. return userName;
    9. }
    10. }

    发送邮件监听器:

    创建 SendEmailListener 类,邮箱 Service。代码如下:

    1. @Component
    2. public class SendEmailListener implements ApplicationListener<UserRegistryEvent> {
    3. Logger LOGGER = LoggerFactory.getLogger(SendEmailListener.class);
    4. @Override
    5. public void onApplicationEvent(UserRegistryEvent event) {
    6. LOGGER.info("给用户{}发送注册成功邮件!", event.getUserName());
    7. }
    8. }

    注意:

    • 实现 ApplicationListener 接口,通过 E 泛型设置感兴趣的事件,如UserRegistryEvent;

    • 实现 #onApplicationEvent(E event) 方法,针对监听的 UserRegisterEvent 事件,进行自定义处理。

    用户注册服务:注册功能+发布用户注册事件

    创建UserRegisterService 类,用户 Service。代码如下:

    1. @Service
    2. @Slf4j
    3. public class UserRegisterService implements ApplicationEventPublisherAware {
    4. private ApplicationEventPublisher applicationEventPublisher;
    5. @Override
    6. public void setApplicationEventPublisher(ApplicationEventPublisher applicationEventPublisher) {
    7. this.applicationEventPublisher = applicationEventPublisher;
    8. }
    9. public void registryUser(String userName) {
    10. // 用户注册(入库等)
    11. log.info("用户{}注册成功", userName);
    12. applicationEventPublisher.publishEvent(new UserRegistryEvent(this, userName));
    13. //applicationEventPublisher.publishEvent(event);
    14. }
    15. }

    注意:

    • 上面实现了ApplicationEventPublisherAware接口,spring容器会通过setApplicationEventPublisher将ApplicationEventPublisher注入进来,然后我们就可以使用这个来发布事件了;

    • 在执行完注册逻辑后,调用 ApplicationEventPublisher 的 [#publishEvent(ApplicationEvent event)]方法,发布[UserRegisterEvent]事件

    调用:

    1. @RestController
    2. public class SpringEventController {
    3. @Autowired
    4. private UserRegisterService userRegisterService;
    5. @GetMapping("test-spring-event")
    6. public Object test(String name){
    7. LocalDateTime dateTime = LocalDateTime.now();
    8. userRegisterService.registryUser(name);
    9. return dateTime.toString() + ":spring";
    10. }
    11. }

    运行 http://localhost:12000/server/test-spring-event?name=name1

    输出:

    1. 用户name1注册成功
    2. 给用户name1发送注册成功邮件!

    原理:
    spring容器在创建bean的过程中,会判断bean是否为ApplicationListener类型,进而会将其作为监听器注册到AbstractApplicationContext#applicationEventMulticaster中,

    1. AbstractApplicationContext.java -》ApplicationEventPublisher
    2. @Override
    3. public void addApplicationListener(ApplicationListener listener) {
    4. Assert.notNull(listener, "ApplicationListener must not be null");
    5. if (this.applicationEventMulticaster != null) {
    6. this.applicationEventMulticaster.addApplicationListener(listener); // 广播器中添加监听器
    7. }
    8. this.applicationListeners.add(listener);
    9. }
    10. // 发布事件
    11. protected void publishEvent(Object event, @Nullable ResolvableType eventType) {
    12. Assert.notNull(event, "Event must not be null");
    13. // Decorate event as an ApplicationEvent if necessary
    14. ApplicationEvent applicationEvent;
    15. if (event instanceof ApplicationEvent) {
    16. applicationEvent = (ApplicationEvent) event;
    17. }
    18. else {
    19. applicationEvent = new PayloadApplicationEvent<>(this, event);
    20. if (eventType == null) {
    21. eventType = ((PayloadApplicationEvent) applicationEvent).getResolvableType();
    22. }
    23. }
    24. // Multicast right now if possible - or lazily once the multicaster is initialized
    25. if (this.earlyApplicationEvents != null) {
    26. this.earlyApplicationEvents.add(applicationEvent);
    27. }
    28. else {
    29. getApplicationEventMulticaster().multicastEvent(applicationEvent, eventType);
    30. }
    31. // Publish event via parent context as well...
    32. if (this.parent != null) {
    33. if (this.parent instanceof AbstractApplicationContext) {
    34. ((AbstractApplicationContext) this.parent).publishEvent(event, eventType);
    35. }
    36. else {
    37. this.parent.publishEvent(event);
    38. }
    39. }
    40. }

    这块的源码在下面这个方法中,

    org.springframework.context.support.ApplicationListenerDetector#postProcessAfterInitialization

    1. @Override
    2. public Object postProcessAfterInitialization(Object bean, String beanName) {
    3. if (bean instanceof ApplicationListener) {
    4. // potentially not detected as a listener by getBeanNamesForType retrieval
    5. Boolean flag = this.singletonNames.get(beanName);
    6. if (Boolean.TRUE.equals(flag)) {
    7. // singleton bean (top-level or inner): register on the fly
    8. this.applicationContext.addApplicationListener((ApplicationListener<?>) bean);
    9. }
    10. else if (Boolean.FALSE.equals(flag)) {
    11. if (logger.isWarnEnabled() && !this.applicationContext.containsBean(beanName)) {
    12. // inner bean with other scope - can't reliably process events
    13. logger.warn("Inner bean '" + beanName + "' implements ApplicationListener interface " +
    14. "but is not reachable for event multicasting by its containing ApplicationContext " +
    15. "because it does not have singleton scope. Only top-level listener beans are allowed " +
    16. "to be of non-singleton scope.");
    17. }
    18. this.singletonNames.remove(beanName);
    19. }
    20. }
    21. return bean;
    22. }

    4.2 面向@EventListener注解的方式

    可以通过 condition 属性指定一个SpEL表达式,如果返回 “true”, “on”, “yes”, or “1” 中的任意一个,则事件会被处理,否则不会。

    1. @EventListener(condition = "#userRegistryEvent.userName eq 'name2'")
    2. public void getCustomEvent(UserRegistryEvent userRegistryEvent) {
    3. LOGGER.info("EventListener 给用户{}发送注册邮件成功!", userRegistryEvent.getUserName());
    4. }

    运行http://localhost:12000/server/test-spring-event?name=name1

    输出:

    1. 用户name1注册成功
    2. 给用户name1发送注册成功邮件!

    运行http://localhost:12000/server/test-spring-event?name=name2

    输出:

    1. 用户name2注册成功
    2. 给用户name2发送注册成功邮件!
    3. EventListener 给用户name2发送注册邮件成功!

    原理:

    EventListenerMethodProcessor实现了SmartInitializingSingleton接口,SmartInitializingSingleton接口中的afterSingletonsInstantiated方法会在所有单例的bean创建完成之后被spring容器调用。spring中处理@EventListener注解源码位于下面的方法中

    org.springframework.context.event.EventListenerMethodProcessor#afterSingletonsInstantiated

    1. public class EventListenerMethodProcessor
    2. implements SmartInitializingSingleton, ApplicationContextAware, BeanFactoryPostProcessor {
    3. @Override
    4. public void afterSingletonsInstantiated() {
    5. ...
    6. ...
    7. ...
    8. try {
    9. processBean(beanName, type); //bean
    10. }
    11. catch (Throwable ex) {
    12. throw new BeanInitializationException("Failed to process @EventListener " +
    13. "annotation on bean with name '" + beanName + "'", ex);
    14. }
    15. }
    16. }
    17. }
    18. }
    19. private void processBean(final String beanName, final Class targetType) {
    20. if (!this.nonAnnotatedClasses.contains(targetType) &&
    21. AnnotationUtils.isCandidateClass(targetType, EventListener.class) &&
    22. !isSpringContainerClass(targetType)) {
    23. Map<Method, EventListener> annotatedMethods = null;
    24. try {
    25. annotatedMethods = MethodIntrospector.selectMethods(targetType,
    26. (MethodIntrospector.MetadataLookup<EventListener>) method ->
    27. AnnotatedElementUtils.findMergedAnnotation(method, EventListener.class));
    28. }
    29. catch (Throwable ex) {
    30. // An unresolvable type in a method signature, probably from a lazy bean - let's ignore it.
    31. if (logger.isDebugEnabled()) {
    32. logger.debug("Could not resolve methods for bean with name '" + beanName + "'", ex);
    33. }
    34. }
    35. if (CollectionUtils.isEmpty(annotatedMethods)) {
    36. this.nonAnnotatedClasses.add(targetType);
    37. if (logger.isTraceEnabled()) {
    38. logger.trace("No @EventListener annotations found on bean class: " + targetType.getName());
    39. }
    40. }
    41. else {
    42. // Non-empty set of methods
    43. ConfigurableApplicationContext context = this.applicationContext;
    44. Assert.state(context != null, "No ApplicationContext set");
    45. List<EventListenerFactory> factories = this.eventListenerFactories;
    46. Assert.state(factories != null, "EventListenerFactory List not initialized");
    47. for (Method method : annotatedMethods.keySet()) {
    48. for (EventListenerFactory factory : factories) {
    49. if (factory.supportsMethod(method)) { // 此处,针对所有EventListener注解的方法,均返回true,
    50. Method methodToUse = AopUtils.selectInvocableMethod(method, context.getType(beanName));
    51. ApplicationListener applicationListener =
    52. factory.createApplicationListener(beanName, targetType, methodToUse);
    53. if (applicationListener instanceof ApplicationListenerMethodAdapter) {
    54. ((ApplicationListenerMethodAdapter) applicationListener).init(context, this.evaluator);
    55. }
    56. context.addApplicationListener(applicationListener);// 往容器中注入监听器,同 接口方式
    57. break;
    58. }
    59. }
    60. }
    61. if (logger.isDebugEnabled()) {
    62. logger.debug(annotatedMethods.size() + " @EventListener methods processed on bean '" +
    63. beanName + "': " + annotatedMethods);
    64. }
    65. }
    66. }
    67. }
    68. }

    4.3 监听器排序

    如果某个事件有多个监听器,默认情况下,监听器执行顺序是无序的,不过我们可以为监听器指定顺序。

    4.3.1 通过接口实现监听器:

    三种方式指定监听器顺序:

    • 实现org.springframework.core.Ordered接口#getOrder,返回值越小,顺序越高

    • 实现org.springframework.core.PriorityOrdered接口#getOrder

    • 类上使用org.springframework.core.annotation.Order注解

    4.3.2 通过@EventListener:

    可以在标注@EventListener的方法上面使用@Order(顺序值)注解来标注顺序,

    4.4 监听器异步模式

    监听器最终通过ApplicationEventMulticaster内部的实现来调用,默认实现类SimpleApplicationEventMulticaster,这个类是支持监听器异步调用的。

    1. @Override
    2. public void multicastEvent(final ApplicationEvent event, @Nullable ResolvableType eventType) {
    3. ResolvableType type = (eventType != null ? eventType : resolveDefaultEventType(event));
    4. Executor executor = getTaskExecutor();
    5. for (ApplicationListener<?> listener : getApplicationListeners(event, type)) {
    6. if (executor != null) {
    7. executor.execute(() -> invokeListener(listener, event));
    8. }
    9. else {
    10. invokeListener(listener, event);
    11. }
    12. }
    13. }

    上面的invokeListener方法内部就是调用监听器,从代码可以看出,如果当前executor不为空,监听器就会被异步调用,所以如果需要异步只需要让executor不为空就可以了,但是默认情况下executor是空的,此时需要我们来给其设置一个值,下面我们需要看容器中是如何创建广播器的,我们在那个地方去干预。

    AnnotationConfigServletWebServerApplicationContext -》 ServletWebServerApplicationContext -》 GenericWebApplicationContext -》 GenericApplicationContext -》 AbstractApplicationContext -》 ConfigurableApplicationContext -》 ApplicationContext -》 ApplicationEventPublisher
    

    通常我们使用的容器是继承于AbstractApplicationContext类型的,在容器启动的时候会调用AbstractApplicationContext#initApplicationEventMulticaster,初始化广播器:

    1. private ApplicationEventMulticaster applicationEventMulticaster;
    2. public static final String APPLICATION_EVENT_MULTICASTER_BEAN_NAME = "applicationEventMulticaster";
    3. protected void initApplicationEventMulticaster() {
    4. ConfigurableListableBeanFactory beanFactory = getBeanFactory();
    5. if (beanFactory.containsLocalBean(APPLICATION_EVENT_MULTICASTER_BEAN_NAME)) { // 判断容器中是否有一个 applicationEventMulticaster bean,有的话直接拿到使用
    6. this.applicationEventMulticaster =
    7. beanFactory.getBean(APPLICATION_EVENT_MULTICASTER_BEAN_NAME, ApplicationEventMulticaster.class);
    8. if (logger.isTraceEnabled()) {
    9. logger.trace("Using ApplicationEventMulticaster [" + this.applicationEventMulticaster + "]");
    10. }
    11. }
    12. else {
    13. this.applicationEventMulticaster = new SimpleApplicationEventMulticaster(beanFactory);
    14. beanFactory.registerSingleton(APPLICATION_EVENT_MULTICASTER_BEAN_NAME, this.applicationEventMulticaster);
    15. if (logger.isTraceEnabled()) {
    16. logger.trace("No '" + APPLICATION_EVENT_MULTICASTER_BEAN_NAME + "' bean, using " +
    17. "[" + this.applicationEventMulticaster.getClass().getSimpleName() + "]");
    18. }
    19. }
    20. }

    判断spring容器中是否有名称为applicationEventMulticaster的bean,如果有就将其作为事件广播器,否则创建一个SimpleApplicationEventMulticaster作为广播器,并将其注册到spring容器中。

    自定义一个类型为SimpleApplicationEventMulticaster名称为applicationEventMulticaster的bean就可以了,顺便给executor设置一个值,就可以实现监听器异步执行了。

    实现如下:

    1. @Configuration
    2. public class SyncListenerConfig {
    3. @Bean
    4. public ApplicationEventMulticaster applicationEventMulticaster() {
    5. // 创建一个事件广播器
    6. SimpleApplicationEventMulticaster result = new SimpleApplicationEventMulticaster();
    7. // 给广播器提供一个线程池,通过这个线程池来调用事件监听器
    8. ThreadPoolTool threadPoolTool = new ThreadPoolTool();
    9. ThreadPoolExecutor executor = threadPoolTool.build();
    10. // 设置异步执行器
    11. result.setTaskExecutor(executor);
    12. return result;
    13. }
    14. }
    15. @Slf4j
    16. //@Data
    17. public class ThreadPoolTool {
    18. private static int corePoolSize = Runtime.getRuntime().availableProcessors();
    19. private static int maximumPoolSize = corePoolSize * 2;
    20. private static long keepAliveTime = 10;
    21. private static TimeUnit unit = TimeUnit.SECONDS;
    22. private static BlockingQueue workQueue = new ArrayBlockingQueue<>(3);
    23. private static ThreadFactory threadFactory = new NameTreadFactory();
    24. private static RejectedExecutionHandler handler = new MyIgnorePolicy();
    25. private ThreadPoolExecutor executor;
    26. public ThreadPoolExecutor build() {
    27. executor = new ThreadPoolExecutor(corePoolSize, maximumPoolSize, keepAliveTime, unit,
    28. workQueue, threadFactory, handler);
    29. executor.prestartAllCoreThreads(); // 预启动所有核心线程
    30. return executor;
    31. }
    32. }
    33. @Slf4j
    34. public class NameTreadFactory implements ThreadFactory {
    35. private AtomicInteger mThreadNum = new AtomicInteger(1);
    36. @Override
    37. public Thread newThread(Runnable r) {
    38. Thread thread = new Thread(r, "my-thread-" + mThreadNum.getAndIncrement());
    39. log.info(thread.getName() + " has been created");
    40. return thread;
    41. }
    42. }

    运行后输出:

    1. INFO []2023-02-15 14:58:49.182[org.im.eventtest.spring.UserRegisterService][31][http-nio-12000-exec-1][INFO]-用户name2注册成功
    2. INFO []2023-02-15 14:58:49.184[org.im.eventtest.spring.SendEmailListener][24][my-thread-16][INFO]-给用户name2发送注册成功邮件!
    3. INFO []2023-02-15 14:58:49.278[org.im.eventtest.spring.SendEmailListener][30][my-thread-15][INFO]-EventListener 给用户name2发送注册邮件成功!

    5、使用建议

    • 可以使用spring事件机制来传参、解耦等;

    • 对于一些非主要的业务(失败后不影响主业务处理),可以使用异步的事件模式;

    • spring中事件无论是使用接口的方式还是使用注解的方式,都可以(最好团队内部统一使用一种方式)。

  • 相关阅读:
    读懂“渠道分销数字化转型”,纷享销客用5个核心角色驱动力解构
    JavaScript 63 JavaScript 对象 63.6 JavaScript 对象构造器
    Stream流学习(四)查找 / 匹配操作
    iOS Error Domain=PHPhotosErrorDomain Code=3300
    Least-upper-bound property
    Oracle数据迁移实用入门
    进化计算领域exploration和exploitation的区别
    inline的讨论——标准库的模板变量
    C# 简单封装异步Socket Server
    Spring Boot - filter 的顺序
  • 原文地址:https://blog.csdn.net/Jernnifer_mao/article/details/133296192