• @Async注解


    一、简介

    1)在方法上使用该@Async注解,申明该方法是一个异步任务;
    2)在类上面使用该@Async注解,申明该类中的所有方法都是异步任务;
    3)方法上一旦标记了这个@Async注解,当其它线程调用这个方法时,就会开启一个新的子线程去异步处理该业务逻辑。
    4)使用此注解的方法的类对象,必须是spring管理下的bean对象;
    5)要想使用异步任务,需要在主类上开启异步配置,即配置上@EnableAsync注解;

    二、使用

    1、基础代码示例

    1)启动类中增加@EnableAsync

    以Spring boot 为例,启动类中增加@EnableAsync:

    @EnableAsync
    @SpringBootApplication
    public class ManageApplication {
        //...
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    2)方法上加@Async注解:
    @Component
    public class MyAsyncTask {
         @Async
        public void asyncCpsItemImportTask(Long platformId, String jsonList){
            //...具体业务逻辑
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7

    2、隐含问题一:默认线程池配置不合适,导致系统奔溃

    @Async注解在使用时,如果不指定线程池的名称,则使用Spring默认的线程池,Spring默认的线程池为SimpleAsyncTaskExecutor。

    该类型线程池的默认配置:

        默认核心线程数:8,
        
        最大线程数:Integet.MAX_VALUE,
        队列使用LinkedBlockingQueue,
        容量是:Integet.MAX_VALUE,
        空闲线程保留时间:60s,
        线程池拒绝策略:AbortPolicy。
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7

    从最大线程数的配置上,相信你也看到问题了:并发情况下,会无限创建线程、然后OOM、然后系统崩溃。。。

    1)问题一解决方法一:

    可以通过修改线程池默认配置,来解决上述问题;

    spring:
      task:
        execution:
          pool:
            max-size: 6
            core-size: 3
            keep-alive: 3s
            queue-capacity: 1000
            thread-name-prefix: name
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    2)问题一解决方法二:

    @Async注解,支持使用自定义线程池,所以通过自定义线程池解决上述问题。
    或者说,有时候、实际开发中就是要求你必修使用指定的线程池,@Async注解是支持的。

    a、编写配置类

    @Configuration
    @Data
    public class ExecutorConfig{
        /**
         * 核心线程
         */
         @Value("${***}")
        private int corePoolSize;
        /**
         * 最大线程
         */
         @Value("${***}")
        private int maxPoolSize;
        /**
         * 队列容量
         */
         @Value("${***}")
        private int queueCapacity;
        /**
         * 保持时间
         */
         @Value("${***}")
        private int keepAliveSeconds;
        /**
         * 名称前缀
         */
         @Value("${***}")
        private String preFix;
     
        @Bean("MyExecutor")
        public Executor myExecutor() {
            ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
            executor.setCorePoolSize(corePoolSize);
            executor.setMaxPoolSize(maxPoolSize);
            executor.setQueueCapacity(queueCapacity);
            executor.setKeepAliveSeconds(keepAliveSeconds);
            executor.setThreadNamePrefix(preFix);
            executor.setRejectedExecutionHandler( new ThreadPoolExecutor.AbortPolicy());
            executor.initialize();
            return executor;
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42

    b、方法上加@Async注解,同时指定自定义线程池

    @Component
    public class MyAsyncTask {
         @Async("MyExecutor") //使用自定义的线程池(执行器)
        public void asyncCpsItemImportTask(Long platformId, String jsonList){
            //...具体业务逻辑
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7

    3、隐含问题二:异步任务的事务问题

    @Async注解由于是异步执行的,在其进行数据库的操作之时,将无法控制事务管理。
    解决办法:可以把@Transactional注解放到内部的需要进行事务的方法上;
    即将方法中对数据库的操作集中提取出来、放入一个方法中,对该方法加@Transactional注解进行事务控制

    4、隐含问题三:在同类方法中调用@Async方法,没有异步执行

    @Async的原理概括:

    @Async 异步执行,是通过 Spring AOP 动态代理 的方式来实现的。Spring容器启动初始化bean时,判断类中是否使用了@Async注解,如果使用了则为其创建切入点和切入点处理器,根据切入点创建代理,在线程调用@Async注解标注的方法时,会调用代理,执行切入点处理器invoke方法,将方法的执行提交给线程池中的另外一个线程来处理,从而实现了异步执行。

    所以,如果a方法调用它同类中的标注@Async的b方法,是不会异步执行的,因为从a方法进入调用的都是该类对象本身,不会进入代理类。因此,相同类中的方法调用带@Async的方法是无法异步的,这种情况仍然是同步。

    三、异步任务的返回结果

    异步的业务逻辑处理场景 有两种:一个是不需要返回结果,另一种是需要接收返回结果。

    不需要返回结果的比较简单,就不多说了。

    需要接收返回结果的示例如下:

    @Async("MyExecutor")
    public Future<Map<Long, List>> queryMap(List ids) {
        List<> result = businessService.queryMap(ids);
        ..............
        Map<Long, List> resultMap = Maps.newHashMap();
        ...
        return new AsyncResult<>(resultMap);
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8

    调用异步方法的示例:

    public Map<Long, List> asyncProcess(List<BindDeviceDO> bindDevices,List<BindStaffDO> bindStaffs, String dccId) {
            Map<Long, List> finalMap =null;
            // 返回值:
            Future<Map<Long, List>> asyncResult = MyService.queryMap(ids);
            try {
                finalMap = asyncResult.get();
            } catch (Exception e) {
                ...
            }
            return finalMap;
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11

    我个人觉得,异步方法不该设置返回值;因为调用异步方法的地方,还要等待返回结果的话,那就差不多又成了串行执行了,失去了异步的意义。

    四、检测给@Async配置自定义线程池、会是整个项目共用的吗?

    1、线程池本身也会消耗内存资源,所以我们要控制线程池的规模,防止它占用过多资源、进而影响项目运行;
    2、为了统一规划资源,线程池尽量统一配置,即全项目尽量使用同一个线程池。
    3、那么使用@Async,并自定义线程池,会全局公用吗?

    我们做如下测试:

    1)配置参数,并编写线程池配置类

    /**application.yml配置**/
    # 自定义线程池参数(用以@Async使用,可选)
    execution:
      pool:
        core-size: 3
        queue-capacity: 500
        max-size: 10
        keep-alive: 3
        thread-name-prefix: customize-th-
    
    
    /**线程池配置类**/
    @Configuration
    public class ExecutorConfig {
    
        /**
         * 核心线程
         */
        @Value("${execution.pool.core-size}")
        private int corePoolSize;
        /**
         * 队列容量
         */
        @Value("${execution.pool.queue-capacity}")
        private int queueCapacity;
        /**
         * 最大线程
         */
        @Value("${execution.pool.max-size}")
        private int maxPoolSize;
        /**
         * 保持时间
         */
        @Value("${execution.pool.keep-alive}")
        private int keepAliveSeconds;
        /**
         * 名称前缀
         */
        @Value("${execution.pool.thread-name-prefix}")
        private String preFix;
    
        @Bean("MyExecutor")
        public Executor myExecutor() {
            ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
            executor.setCorePoolSize(corePoolSize);
            executor.setMaxPoolSize(maxPoolSize);
            executor.setQueueCapacity(queueCapacity);
            executor.setKeepAliveSeconds(keepAliveSeconds);
            executor.setThreadNamePrefix(preFix);
            executor.setRejectedExecutionHandler( new ThreadPoolExecutor.AbortPolicy());
            executor.initialize();
            return executor;
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54

    2)写两个测试类,使用@Async标记方法

    /**测试类A**/
    @Service
    public class TestServiceAImpl implements TestServiceA {
    
        @Async("MyExecutor")
        @Override
        public void testMethod1() {
            for (int i = 0; i < 10; i++) {
                final int index = i;
                System.out.println("A类方法一的 " + index + " 被执行,线程名:" + Thread.currentThread().getName());
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
    
        @Async("MyExecutor")
        @Override
        public void testMethod2() {
            for (int i = 0; i < 10; i++) {
                final int index = i;
                System.out.println("A类方法二的 " + index + " 被执行,线程名:" + Thread.currentThread().getName());
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
    }
    
    /**测试类B**/
    @Service
    public class TestServiceBImpl implements TestServiceB {
    
        @Async("MyExecutor")
        @Override
        public void testMethod1() {
            for (int i = 0; i < 10; i++) {
                final int index = i;
                System.out.println("B类方法一的 " + index + " 被执行,线程名:" + Thread.currentThread().getName());
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
    
        @Async("MyExecutor")
        @Override
        public void testMethod2() {
            for (int i = 0; i < 10; i++) {
                final int index = i;
                System.out.println("B类方法二的 " + index + " 被执行,线程名:" + Thread.currentThread().getName());
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
    
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66

    3)写一个测试Controller接口,异步调用两个测试类的方法

    @RestController
    public class TestController{
    
    	@Autowired
        private TestServiceA testServiceA;
    
        @Autowired
        private TestServiceB testServiceB;
    
        /**
         * 测试线程01
         */
        @GetMapping(value = "/threadTest")
        public void threadTest01() {
            System.out.println("【线程一】" + "group:"+ Thread.currentThread().getThreadGroup() + "; id:" +Thread.currentThread().getId()+"; name:"+ Thread.currentThread().getName());
    
            testServiceA.testMethod1();
            testServiceA.testMethod2();
    
            testServiceB.testMethod1();
            testServiceB.testMethod2();
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23

    4)分析执行结果

    执行结果如下:

    2023-04-12 14:03:38.945 [http-nio-8085-exec-2] INFO  o.a.c.c.C.[.[.[/] - [log,173] - Initializing Spring DispatcherServlet 'dispatcherServlet'
    【线程一】group:java.lang.ThreadGroup[name=main,maxpri=10]; id:85; name:http-nio-8085-exec-2
    A类方法一的 0 被执行,线程名:customize-th-1
    A类方法二的 0 被执行,线程名:customize-th-2
    B类方法一的 0 被执行,线程名:customize-th-3
    A类方法一的 1 被执行,线程名:customize-th-1
    B类方法一的 1 被执行,线程名:customize-th-3
    A类方法二的 1 被执行,线程名:customize-th-2
    B类方法一的 2 被执行,线程名:customize-th-3
    A类方法二的 2 被执行,线程名:customize-th-2
    A类方法一的 2 被执行,线程名:customize-th-1
    A类方法一的 3 被执行,线程名:customize-th-1
    A类方法二的 3 被执行,线程名:customize-th-2
    B类方法一的 3 被执行,线程名:customize-th-3
    A类方法二的 4 被执行,线程名:customize-th-2
    A类方法一的 4 被执行,线程名:customize-th-1
    B类方法一的 4 被执行,线程名:customize-th-3
    A类方法二的 5 被执行,线程名:customize-th-2
    B类方法一的 5 被执行,线程名:customize-th-3
    A类方法一的 5 被执行,线程名:customize-th-1
    A类方法一的 6 被执行,线程名:customize-th-1
    B类方法一的 6 被执行,线程名:customize-th-3
    A类方法二的 6 被执行,线程名:customize-th-2
    A类方法一的 7 被执行,线程名:customize-th-1
    A类方法二的 7 被执行,线程名:customize-th-2
    B类方法一的 7 被执行,线程名:customize-th-3
    B类方法一的 8 被执行,线程名:customize-th-3
    A类方法二的 8 被执行,线程名:customize-th-2
    A类方法一的 8 被执行,线程名:customize-th-1
    B类方法一的 9 被执行,线程名:customize-th-3
    A类方法一的 9 被执行,线程名:customize-th-1
    A类方法二的 9 被执行,线程名:customize-th-2
    B类方法二的 0 被执行,线程名:customize-th-1
    B类方法二的 1 被执行,线程名:customize-th-1
    B类方法二的 2 被执行,线程名:customize-th-1
    B类方法二的 3 被执行,线程名:customize-th-1
    B类方法二的 4 被执行,线程名:customize-th-1
    B类方法二的 5 被执行,线程名:customize-th-1
    B类方法二的 6 被执行,线程名:customize-th-1
    B类方法二的 7 被执行,线程名:customize-th-1
    B类方法二的 8 被执行,线程名:customize-th-1
    B类方法二的 9 被执行,线程名:customize-th-1
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43

    【分析】
    从结果可以看到,A类、B类的方法交替执行,但是他们的线程都来自同一个线程池“customize-th-”、也就是我自己配置的线程池。
    不仅如此,它们还遵循我对线程池的配置(核心线程数3),每当正在运行的线程满3,不论是A类还是B类、接下来的任务就先放入队列,等有空余线程再执行。
    从以上两点可以确认,A类和B类用的是同一个线程池,@Async注解使用自定义线程池异步执行任务,只要在注解后添加线程池配置名称@Async(“MyExecutor”)、就可以实现整个项目公用同一个线程池。

  • 相关阅读:
    【管理运筹学】第 10 章 | 排队论(3,标准的 M/M/1 排队系统)
    解决多线程调用sql存储过程问题
    python神经网络框架有哪些,python调用神经网络模型
    工作流的例子
    某金融机构分布式数据库架构方案与运维方案设计分享
    上周热点回顾(5.6-5.12)
    Vue3中如何通过内嵌iframe传递参数与接收参数
    VcXsrv Releases 21.1.13 更新发布了~~
    上周热点回顾(5.15-5.21)
    vmware vcenter conventer 可以转换windows2000的版本有下载链接吗
  • 原文地址:https://blog.csdn.net/Derek7117/article/details/127842150