• CompletableFuture理解与应用


    背景:有需要用到异步编程的,需要响应式回调,不是单线程不好,redis就很棒。根据实际情况选择,这篇文章 介绍了CompletableFutureFutureRxJavaObservable之间的区别可以看看。

    常用api:

    部分不带后缀Async的和带的区别在于是否开启新线程处理结果,根据具体实际情况选用。
    只演示有返回值的,没返回值的不掩饰。

    1. supplyAsync创建一个带返回值的异步任务。
    2. thenApplyAsync某个任务执行完执行的回调。
    3. exceptionally 某个任务执行异常时执行的回调方法。
    4. handleAsync某个任务执行完成后执行的回调方法,可以处理异常。
    5. thenCombine两个任务都正常执行完才执行后面任务。
    6. applyToEither只要其中一个执行完了就会执行某个任务。
    7. allOf多个任务都执行完成后才会执行。
    8. anyOf谁先执行完就持有谁。

    supplyAsync示例:

    runAsync功能类似只是没返回值不推荐

        @GetMapping("/supplyAsync")
        public String supplyAsync() throws ExecutionException, InterruptedException {
            System.out.println(Thread.currentThread().getName());
            long l = System.currentTimeMillis();
            CompletableFuture<Double> f1 = CompletableFuture.supplyAsync(()->{
                System.out.println("supplyAsync--->"+Thread.currentThread().getName());
                try {
                    Thread.sleep(2000);
                } catch (InterruptedException e) {
                }
                return 1.2;
            });
            //获取结果
            System.out.println("最终结果:"+String.valueOf(f1.get()));
            System.out.println("耗时:"+(System.currentTimeMillis()-l)/1000);
            return "success";
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17

    输出:
    可以看到主线程和子线程在运行,主线程阻塞等待子线程获取结果,不需要结果不调用get()方法即可实现异步。
    最终耗时2秒也就是主线程在等待子线程的2秒。

    http-nio-8080-exec-1
    supplyAsync--->ForkJoinPool.commonPool-worker-1
    最终结果:1.2
    耗时:2
    
    • 1
    • 2
    • 3
    • 4

    thenApplyAsync示例

    可以在任务后面链式使用,thenApply,thenAccept,thenRun 功能类似不推荐,要么没入参要么没返回值
    入参是上一个任务的返回值,出参和上一个任务的返回值类型一样。
    可以写多个thenApplyAsync来处理后续任务。

        @GetMapping("/supplyAsync")
        public String supplyAsync() throws ExecutionException, InterruptedException {
            System.out.println(Thread.currentThread().getName());
            long l = System.currentTimeMillis();
            CompletableFuture<Double> f1 = CompletableFuture.supplyAsync(()->{
                System.out.println("supplyAsync--->"+Thread.currentThread().getName());
                try {
                    Thread.sleep(2000);
                } catch (InterruptedException e) {
                }
                return 1.2;
            }).thenApplyAsync(result->{
                try {
                    Thread.sleep(2000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println("thenApplyAsync->"+Thread.currentThread().getName());
                return result*2;
            }).thenApplyAsync(result->{
                try {
                    Thread.sleep(2000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println("thenApplyAsync->"+Thread.currentThread().getName());
                return result*2;
            });
            //获取结果
            System.out.println("最终结果:"+String.valueOf(f1.get()));
            System.out.println("耗时:"+(System.currentTimeMillis()-l)/1000);
            return "success";
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33

    输出:
    supplyAsync返回值是1.2,经过thenApplyAsync后续处理返回翻倍,最终返回4.8。
    thenApplyAsync第一个任务和supplyAsync共用一个线程,第二个新开一个线程这就是后缀加Async作用。
    这是一个链路,整体耗时6秒,每一个子线程按照顺序执行,三个线程分别是2秒合计6秒。

    http-nio-8080-exec-1
    supplyAsync--->ForkJoinPool.commonPool-worker-1
    thenApplyAsync->ForkJoinPool.commonPool-worker-1
    thenApplyAsync->ForkJoinPool.commonPool-worker-1
    最终结果:4.8
    耗时:6
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6

    exceptionally示例

    只有前置任务发生异常才会走这里。
    当前置第一个任务发生异常后走这里,其他后续任务不走。
    返回和前置任务一样类型的数据。

        @GetMapping("/supplyAsync")
        public String supplyAsync() throws ExecutionException, InterruptedException {
            System.out.println(Thread.currentThread().getName());
            long l = System.currentTimeMillis();
            CompletableFuture<Double> f1 = CompletableFuture.supplyAsync(()->{
                System.out.println("supplyAsync--->"+Thread.currentThread().getName());
                try {
                    Thread.sleep(2000);
                } catch (InterruptedException e) {
                }
                int a = 1/0;
                return 1.2;
            }).thenApplyAsync(result->{
                System.out.println("thenApplyAsync--->"+Thread.currentThread().getName());
                int a = 1/0;
                return result*2;
            }).exceptionally(p->{
                System.out.println("exceptionally--->"+Thread.currentThread().getName());
                return -0.1;
            });
            //获取结果
            System.out.println("最终结果:"+String.valueOf(f1.get()));
            System.out.println("耗时:"+(System.currentTimeMillis()-l)/1000);
            return "success";
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25

    输出:
    supplyAsync发生异常,跳过thenApplyAsync不执行,直接走exceptionally

    http-nio-8080-exec-1
    supplyAsync--->ForkJoinPool.commonPool-worker-1
    exceptionally--->ForkJoinPool.commonPool-worker-1
    最终结果:-0.1
    耗时:2
    
    • 1
    • 2
    • 3
    • 4
    • 5

    whenCompleteAsync不演示

    某个任务执行完成后执行的回调方法,接收2个参数,第一个是上一个任务的返回结果,第二个是异常,没返回值不演示。

    handleAsync示例

    whenComplete基本一致,区别在于handleAsync的回调方法有返回值,且返回的结果可以自己定义与上一个任务中的泛型没关系。
    thenApplyAsync在这里抛出异常,handleAsync这里处理异常。

        @GetMapping("/supplyAsync")
        public String supplyAsync() throws ExecutionException, InterruptedException {
            System.out.println(Thread.currentThread().getName());
            long l = System.currentTimeMillis();
            CompletableFuture<Double> f1 = CompletableFuture.supplyAsync(()->{
                System.out.println("supplyAsync--->"+Thread.currentThread().getName());
                try {
                    Thread.sleep(2000);
                } catch (InterruptedException e) {
                }
                return 1.2;
            }).thenApplyAsync(result->{
                System.out.println("thenApplyAsync--->"+Thread.currentThread().getName());
                int a = 1/0;
                return result*2;
            });
            CompletableFuture<String> f2 = f1.handleAsync((a, b) -> {
                System.out.println("handleAsync--->" + Thread.currentThread().getName());
                System.out.println("a--->"+a);
                System.out.println("b--->"+b);
                return "error";
            });
            System.out.println("耗时:"+(System.currentTimeMillis()-l)/1000);
            //获取结果
            System.out.println("最终结果:"+String.valueOf(f2.get()));
            return "success";
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27

    输出:
    报错之后,handleAsync获取不到之前的返回值,只能获取到异常信息,handleAsync中的和返回值。

    http-nio-8080-exec-1
    supplyAsync--->ForkJoinPool.commonPool-worker-1
    thenApplyAsync--->ForkJoinPool.commonPool-worker-1
    handleAsync--->ForkJoinPool.commonPool-worker-1
    a--->null
    b--->java.util.concurrent.CompletionException: java.lang.ArithmeticException: / by zero
    最终结果:error
    耗时:2
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8

    thenCombine示例

    thenAcceptBothrunAfterBoth都是将两个CompletableFuture组合起来,只有都正常执行完才执行后面任务。区别在于,thenCombine会将两个任务的执行结果作为方法入参传递到指定方法中,且该方法有返回值;thenAcceptBoth同样将两个任务的执行结果作为方法入参,但是无返回值;runAfterBoth没有入参,也没有返回值。注意两个任务中只要有一个执行异常,则将该异常信息作为指定任务的执行结果。

        @GetMapping("/supplyAsync")
        public String supplyAsync() throws ExecutionException, InterruptedException {
            System.out.println(Thread.currentThread().getName());
            long l = System.currentTimeMillis();
            CompletableFuture<Double> f1 = CompletableFuture.supplyAsync(()->{
                System.out.println("supplyAsync1--->"+Thread.currentThread().getName());
                try {
                    Thread.sleep(2000);
                } catch (InterruptedException e) {
                }
                return 1.2;
            });
            CompletableFuture<Double> f2 = CompletableFuture.supplyAsync(()->{
                System.out.println("supplyAsync2--->"+Thread.currentThread().getName());
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                }
                int a = 2/0;
                return 1.8;
            }).handle((a,b)->{
               if(b!=null){
                   return 2.3;
               }else{
                   return 3.3;
               }
            });
            CompletableFuture<Double> f4 = f1.thenCombineAsync(f2, (a, b) -> {
                System.out.println("thenCombineAsync1--->"+Thread.currentThread().getName());
                System.out.println("a--->"+a);
                System.out.println("b--->"+b);
                return a+b;
            });
            //获取结果
            System.out.println("最终结果:"+String.valueOf(f4.get()));
            System.out.println("耗时:"+(System.currentTimeMillis()-l)/1000);
            return "success";
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38

    输出:
    f2出现异常用handle处理,最终thenCombineAsync拿到返回结果。
    耗时2秒,两个子线程一个是1秒一个是2秒,一起执行,最慢完成结束才结束。

    http-nio-8080-exec-2
    supplyAsync1--->ForkJoinPool.commonPool-worker-1
    supplyAsync2--->ForkJoinPool.commonPool-worker-2
    thenCombineAsync1--->ForkJoinPool.commonPool-worker-1
    a--->1.2
    b--->2.3
    最终结果:3.5
    耗时:2
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8

    applyToEither 示例

    acceptEitherrunAfterEither都是将两个CompletableFuture组合起来,看谁跑得快就拿谁,其区别在于applyToEither有入参,有返回值;acceptEither方法入参,没有返回值;runAfterEither没方法入参,也没返回值。两个任务中只要有一个执行异常,则将该异常信息作为指定任务的执行结果。

        @GetMapping("/supplyAsync")
        public String supplyAsync() throws ExecutionException, InterruptedException {
            System.out.println(Thread.currentThread().getName());
            long l = System.currentTimeMillis();
            CompletableFuture<Double> f1 = CompletableFuture.supplyAsync(()->{
                System.out.println("supplyAsync1--->"+Thread.currentThread().getName());
                try {
                    Thread.sleep(2000);
                } catch (InterruptedException e) {
                }
                return 1.2;
            });
            CompletableFuture<Double> f2 = CompletableFuture.supplyAsync(()->{
                System.out.println("supplyAsync2--->"+Thread.currentThread().getName());
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                }
                int a = 2/0;
                return 1.8;
            }).handle((a,b)->{
               if(b!=null){
                   return 2.3;
               }else{
                   return 3.3;
               }
            }).applyToEither(f1,a->a);
            //获取结果
            System.out.println("最终结果:"+String.valueOf(f2.get()));
            System.out.println("耗时:"+(System.currentTimeMillis()-l)/1000);
            return "success";
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32

    输出:
    f2执行速度快返回2.3
    耗时1秒,谁先结束直接结束

    http-nio-8080-exec-1
    supplyAsync1--->ForkJoinPool.commonPool-worker-1
    supplyAsync2--->ForkJoinPool.commonPool-worker-2
    最终结果:2.3
    耗时:1
    
    • 1
    • 2
    • 3
    • 4
    • 5

    allOf示例

    所有任务都结束才结束

        @GetMapping("/supplyAsync")
        public String supplyAsync() throws ExecutionException, InterruptedException {
            System.out.println(Thread.currentThread().getName());
            long l = System.currentTimeMillis();
            CompletableFuture<Double> f1 = CompletableFuture.supplyAsync(()->{
                System.out.println("supplyAsync1--->"+Thread.currentThread().getName());
                try {
                    Thread.sleep(2000);
                } catch (InterruptedException e) {
                }
                return 1.2;
            });
            CompletableFuture<Double> f2 = CompletableFuture.supplyAsync(()->{
                System.out.println("supplyAsync2--->"+Thread.currentThread().getName());
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                }
                return 1.8;
            });
            CompletableFuture.allOf(f1,f2);
            //获取结果
            System.out.println("最终结果:"+String.valueOf(f1.get()));
            System.out.println("最终结果:"+String.valueOf(f2.get()));
            System.out.println("耗时:"+(System.currentTimeMillis()-l)/1000);
            return "success";
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27

    输出:
    耗时2秒,等待所有任务完成才结束

    http-nio-8080-exec-2
    supplyAsync1--->ForkJoinPool.commonPool-worker-1
    supplyAsync2--->ForkJoinPool.commonPool-worker-2
    最终结果:1.2
    最终结果:1.8
    耗时:2
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6

    anyOf示例

    返回最先执行完的任务

        @GetMapping("/supplyAsync")
        public String supplyAsync() throws ExecutionException, InterruptedException {
            System.out.println(Thread.currentThread().getName());
            long l = System.currentTimeMillis();
            CompletableFuture<Double> f1 = CompletableFuture.supplyAsync(()->{
                System.out.println("supplyAsync1--->"+Thread.currentThread().getName());
                try {
                    Thread.sleep(2000);
                } catch (InterruptedException e) {
                }
                return 1.2;
            });
            CompletableFuture<Double> f2 = CompletableFuture.supplyAsync(()->{
                System.out.println("supplyAsync2--->"+Thread.currentThread().getName());
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                }
                return 1.8;
            });
            CompletableFuture<Object> f3 = CompletableFuture.anyOf(f1, f2);
            //获取结果
            System.out.println("最终结果:"+String.valueOf(f3.get()));
            System.out.println("耗时:"+(System.currentTimeMillis()-l)/1000);
            return "success";
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26

    输出
    耗时1秒优先返回耗时最短的任务

    http-nio-8080-exec-1
    supplyAsync1--->ForkJoinPool.commonPool-worker-1
    supplyAsync2--->ForkJoinPool.commonPool-worker-2
    最终结果:1.8
    耗时:1
    
    • 1
    • 2
    • 3
    • 4
    • 5
  • 相关阅读:
    SpringBoot启动流程分析之创建SpringApplication对象(一)
    C++11标准模板(STL)- 算法(std::transform)
    QTableView练习实践00
    scratch接钻石 2023年9月中国电子学会图形化编程 少儿编程 scratch编程等级考试三级真题和答案解析
    Android -BLE 蓝牙模块开发
    酒店数字化转型,就从这4步开始
    我的第一个项目(四):(前端)发送请求以及表单校验
    dotnet 委托的实现解析(2)开放委托和封闭委托 (Open Delegates vs. Closed Delegates)
    Unregistering JMX-exposed beans on shutdown
    Docker基础-namespace
  • 原文地址:https://blog.csdn.net/Smy_0114/article/details/127880514