• @EventPublisher + @Async 异步事件流详解


    本文主要介绍Spring事件流@Async异步线程池处理,以及@Async默认线程池可能会导致的问题及解决方法。

    事件流

    Spring可以使用以观察者模式实现的事件流操作,将业务逻辑解耦,达到职责分离的效果

    Spring事件流的详解

    发布事件:

    public class EmailService implements ApplicationEventPublisherAware {
        private ApplicationEventPublisher publisher;
    
        public void sendEmail(String address, String content) {
             publisher.publishEvent(new BlackListEvent(this, address, content));
            // send email...
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8

    监听事件:

    @EventListener(condition = "#blEvent.content == 'foo'")
    public void processBlackListEvent(BlackListEvent blEvent) {
        // notify appropriate parties via notificationAddress...
    }
    
    • 1
    • 2
    • 3
    • 4

    注意在默认情况下,事件监听器会同步接收事件。这意味着publishEvent()方法将阻塞,直到所有侦听器都已完成对事件的处理为止。

    @Async

    @Async注解bean的一个方法,就会让它在一个单独的线程中执行。换句话说,调用者不会等待被调用方法的完成

    @Async有两个限制:

    1. 它必须仅应用于public方法
    2. 自调用(从同一个类中调用异步方法)将不起作用

    原因:该方法需要为public才可以被代理。而自调用是不生效的,因为它绕过了代理,直接调用了底层方法。

    异步返回参数

    可以通过将实际返回包装在Future中,将@Async应用于具有返回类型的方法

    示例详见How To Do @Async in Spring

    @Async
    public Future<String> asyncMethodWithReturnType() {
        System.out.println("Execute method asynchronously - " 
          + Thread.currentThread().getName());
        try {
            Thread.sleep(5000);
            return new AsyncResult<String>("hello world !!!!");
        } catch (InterruptedException e) {
            //
        }
    
        return null;
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13

    Spring 还提供了一个实现FutureAsyncResult类。我们可以使用它来跟踪异步方法执行的结果。

    现在让我们调用上述方法并使用Future对象检索异步过程的结果。

    public void testAsyncAnnotationForMethodsWithReturnType()
      throws InterruptedException, ExecutionException {
        System.out.println("Invoking an asynchronous method. " 
          + Thread.currentThread().getName());
        Future<String> future = asyncAnnotationExample.asyncMethodWithReturnType();
    
        while (true) {
            if (future.isDone()) {
                System.out.println("Result from asynchronous process - " + future.get());
                break;
            }
            System.out.println("Continue doing something else. ");
            Thread.sleep(1000);
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15

    异步监听器

    如果要特定的侦听器异步处理事件,只需重用常规@Async支持:

    @EventListener
    @Async
    public void processBlackListEvent(BlackListEvent event) {
        // BlackListEvent is processed in a separate thread
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5

    使用异步事件时,请注意以下限制:

    • 如果事件监听器抛出Exception,它将不会传播给调用者,详见AsyncUncaughtExceptionHandler
    • 此类事件监听器无法发送答复事件。如果您需要发送另一个事件作为处理结果,请注入ApplicationEventPublisher以手动发送事件。

    @EventPublisher + @Async 阻塞

    @Async注解在使用时,不指定线程池的名称,默认SimpleAsyncTaskExecutor线程池。

    默认的线程池配置为核心线程数为8,等待队列为无界队列,即当所有核心线程都在执行任务时,后面的任务会进入队列等待,若逻辑执行速度较慢会导致线程池阻塞,从而出现监听器抛弃和无响应的结果

    spring默认线程池配置参数org.springframework.boot.autoconfigure.task.TaskExecutionProperties

    /**
     * Configuration properties for task execution.
     *
     * @author Stephane Nicoll
     * @since 2.1.0
     */
    @ConfigurationProperties("spring.task.execution")
    public class TaskExecutionProperties {
    
    	private final Pool pool = new Pool();
    
    	/**
    	 * Prefix to use for the names of newly created threads.
    	 */
    	private String threadNamePrefix = "task-";
    
    	public static class Pool {
    
    		/**
    		 * Queue capacity. An unbounded capacity does not increase the pool and therefore
    		 * ignores the "max-size" property.
    		 */
    		private int queueCapacity = Integer.MAX_VALUE;
    
    		/**
    		 * Core number of threads.
    		 */
    		private int coreSize = 8;
    
    		/**
    		 * Maximum allowed number of threads. If tasks are filling up the queue, the pool
    		 * can expand up to that size to accommodate the load. Ignored if the queue is
    		 * unbounded.
    		 */
    		private int maxSize = Integer.MAX_VALUE;
    
    		/**
    		 * Whether core threads are allowed to time out. This enables dynamic growing and
    		 * shrinking of the pool.
    		 */
    		private boolean allowCoreThreadTimeout = true;
    
    		/**
    		 * Time limit for which threads may remain idle before being terminated.
    		 */
    		private Duration keepAlive = Duration.ofSeconds(60);
    		
    		//getter/setter
    		}
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50

    自定义线程池

    @Async注解中value参数使用自定义线程池,能让开发工程师更加明确线程池的运行规则,选取适合的线程策略,规避资源耗尽的风险

    当线程池的任务缓存队列已满并且线程池中的线程数目达到maximumPoolSize,如果还有任务到来就会采取任务拒绝策略,通常有以下四种策略:

    1. ThreadPoolExecutor.AbortPolicy:丢弃任务并抛出RejectedExecutionException异常
    2. ThreadPoolExecutor.DiscardPolicy:也是丢弃任务,但是不抛出异常
    3. ThreadPoolExecutor.DiscardOldestPolicy:丢弃队列最前面的任务,然后重新尝试执行任务(重复此过程)
    4. ThreadPoolExecutor.CallerRunsPolicy:重试添加当前的任务,自动重复调用 execute() 方法,直到成功
    @Configuration
    public class ThreadConfig {
    
        @Bean("msgThread")
        public ThreadPoolTaskExecutor getMsgSendTaskExecutor(){
            ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor();
            taskExecutor.setCorePoolSize(10);
            taskExecutor.setMaxPoolSize(25);
            taskExecutor.setQueueCapacity(800);
            taskExecutor.setAllowCoreThreadTimeOut(false);
            taskExecutor.setAwaitTerminationSeconds(60);
            taskExecutor.setThreadNamePrefix("msg-thread-");
            taskExecutor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
            taskExecutor.initialize();
            return taskExecutor;
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17

    监听事件异步处理

    @EventListener(value = MsgEvent.class, condition = "#root.args[0].type == 0")
    @Async("msgThread")
    public void commonEvent(MsgEvent event) {
        //logic
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5

    @Async使用自定义线程池的其他方式


    参考资料:

    1. Spring事件流
    2. @Async优化
    3. How To Do @Async in Spring
    4. Spring使用@Async注解
  • 相关阅读:
    uni-app之android离线自定义基座
    【Unity-Cinemachine相机】相机跟随之Transposer属性
    【并发编程】同步容器与并发容器
    港陆证券:服装家纺公司上半年投资并购力度加大
    基于Huffman码实现的编码译码系统
    java---jar详解
    【多模态融合】TransFusion学习笔记(1)
    02.QMake项目原理和手动配置qtcreator
    MongoDB中的嵌套List操作
    Linux - 进程管理
  • 原文地址:https://blog.csdn.net/why_still_confused/article/details/128097407