• JAVA中使用CompletableFuture进行异步编程


    JAVA中使用CompletableFuture进行异步编程

    1、什么是CompletableFuture

    CompletableFuture 是 JDK8 提供的 Future 增强类,CompletableFuture 异步任务执行线程池,默认是把异步任

    务都放在 ForkJoinPool 中执行。

    在这种方式中,主线程不会被阻塞,不需要一直等到子线程完成,主线程可以并行的执行其他任务。

    2、Future存在的问题

    Future 实际采用 FutureTask 实现,该对象相当于是消费者和生产者的桥梁,消费者通过 FutureTask 存储任务

    的处理结果,更新任务的状态:未开始、正在处理、已完成等。而生产者拿到的 FutureTask 被转型为 Future 接

    口,可以阻塞式获取任务的处理结果,非阻塞式获取任务处理状态。

    通常的线程池接口类 ExecutorService,其中execute方法的返回值是void,即无法获取异步任务的执行状态,3个

    重载的 submit 方法的返回值是 Future,可以据此获取任务执行的状态和结果。

    package com;
    
    import java.util.concurrent.ExecutionException;
    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.Executors;
    import java.util.concurrent.Future;
    
    public class FutureTest {
    
        public static void main(String[] args) throws ExecutionException, InterruptedException {
            test();
        }
    
        public static void test() throws ExecutionException, InterruptedException {
            // 创建异步执行任务
            ExecutorService executorService = Executors.newSingleThreadExecutor();
            Future<Double> cf = executorService.submit(() -> {
                System.out.println(Thread.currentThread() + " start,time->" + System.currentTimeMillis());
                try {
                    Thread.sleep(2000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                if (false) {
                    throw new RuntimeException("test");
                } else {
                    System.out.println(Thread.currentThread() + " exit,time->" + System.currentTimeMillis());
                    return 1.2;
                }
            });
            executorService.shutdown();
            System.out.println("main thread start,time->" + System.currentTimeMillis());
            // 等待子任务执行完成,如果已完成则直接返回结果
            // 如果执行任务异常,则get方法会把之前捕获的异常重新抛出
            System.out.println("run result->" + cf.get());
            System.out.println("main thread exit,time->" + System.currentTimeMillis());
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    # 程序输出
    main thread start,time->1693019539222
    Thread[pool-1-thread-1,5,main] start,time->1693019539222
    Thread[pool-1-thread-1,5,main] exit,time->1693019541223
    run result->1.2
    main thread exit,time->1693019541226
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6

    子线程是异步执行的,主线程休眠等待子线程执行完成,子线程执行完成后唤醒主线程,主线程获取任务执行结果

    后退出。

    很多博客说使用不带等待时间限制的get方法时,如果子线程执行异常了会导致主线程长期阻塞,这其实是错误

    的,子线程执行异常时其异常会被捕获,然后修改任务的状态为异常结束并唤醒等待的主线程,get方法判断任务

    状态发生变更,就终止等待了,并抛出异常。将上述用例中if(false)改成if(true) ,执行结果如下:

    main thread start,time->1693019872552
    Thread[pool-1-thread-1,5,main] start,time->1693019872552
    Exception in thread "main" java.util.concurrent.ExecutionException: java.lang.RuntimeException: test
    	at java.util.concurrent.FutureTask.report(FutureTask.java:122)
    	at java.util.concurrent.FutureTask.get(FutureTask.java:192)
    	at com.FutureTest.test(FutureTest.java:34)
    	at com.FutureTest.main(FutureTest.java:11)
    Caused by: java.lang.RuntimeException: test
    	at com.FutureTest.lambda$test$0(FutureTest.java:25)
    	at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    	at java.lang.Thread.run(Thread.java:745)
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13

    get方法抛出异常导致主线程异常终止。

    Future 的局限性:它没法直接对多个任务进行链式、组合等处理,需要借助并发工具类才能完成,实现逻辑比较

    复杂。

    而 CompletableFuture 是对 Future 的扩展和增强,CompletableFuture 实现了 Future 接口,并在此基础上进

    行了丰富的扩展,完美弥补了 Future 的局限性,同时 CompletableFuture 实现了对任务编排的能力。借助这项

    能力,可以轻松地组织不同任务的运行顺序、规则以及方式。从某种程度上说,这项能力是它的核心能力。而在以

    往,虽然通过 CountDownLatch 等工具类也可以实现任务的编排,但需要复杂的逻辑处理,不仅耗费精力且难以

    维护。

    CompletableFuture实现了CompletionStage接口和Future接口,前者是对后者的一个扩展,增加了异步回调、

    流式处理、多个Future组合处理的能力,使Java在处理多任务的协同工作时更加顺畅便利。

    CompletionStage 接口定义了任务编排的方法,执行某一阶段,可以向下执行后续阶段。异步执行的,默认线程

    池是 ForkJoinPool.commonPool(),但为了业务之间互不影响,且便于定位问题,强烈推荐使用自定义线程池

    CompletableFuture 中默认线程池如下:

    // 根据commonPool的并行度来选择,而并行度的计算是在ForkJoinPool的静态代码段完成的
    private static final boolean useCommonPool =
        (ForkJoinPool.getCommonPoolParallelism() > 1);
    
    private static final Executor asyncPool = useCommonPool ?
        ForkJoinPool.commonPool() : new ThreadPerTaskExecutor();
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6

    ForkJoinPool 中初始化 commonPool 的参数:

    static {
        // initialize field offsets for CAS etc
        try {
            U = sun.misc.Unsafe.getUnsafe();
            Class<?> k = ForkJoinPool.class;
            CTL = U.objectFieldOffset
                (k.getDeclaredField("ctl"));
            RUNSTATE = U.objectFieldOffset
                (k.getDeclaredField("runState"));
            STEALCOUNTER = U.objectFieldOffset
                (k.getDeclaredField("stealCounter"));
            Class<?> tk = Thread.class;
            ……
        } catch (Exception e) {
            throw new Error(e);
        }
    
        commonMaxSpares = DEFAULT_COMMON_MAX_SPARES;
        defaultForkJoinWorkerThreadFactory =
            new DefaultForkJoinWorkerThreadFactory();
        modifyThreadPermission = new RuntimePermission("modifyThread");
    
        // 调用makeCommonPool方法创建commonPool,其中并行度为逻辑核数-1
        common = java.security.AccessController.doPrivileged
            (new java.security.PrivilegedAction<ForkJoinPool>() {
                public ForkJoinPool run() { return makeCommonPool(); }});
        int par = common.config & SMASK; // report 1 even if threads disabled
        commonParallelism = par > 0 ? par : 1;
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29

    3、CompletableFuture功能

    3.1 依赖关系

    thenApply():把前面任务的执行结果,交给后面的Function。

    thenCompose():用来连接两个有依赖关系的任务,结果由第二个任务返回。

    3.2 and集合关系

    thenCombine():合并任务,有返回值。

    thenAccepetBoth():两个任务执行完成后,将结果交给thenAccepetBoth处理,无返回值。

    runAfterBoth():两个任务都执行完成后,执行下一步操作(Runnable类型任务)。

    3.3 or聚合关系

    applyToEither():两个任务哪个执行的快,就使用哪一个结果,有返回值。

    acceptEither():两个任务哪个执行的快,就消费哪一个结果,无返回值。

    runAfterEither():任意一个任务执行完成,进行下一步操作(Runnable类型任务)。

    3.4 并行执行

    allOf():当所有给定的 CompletableFuture 完成时,返回一个新的 CompletableFuture。

    anyOf():当任何一个给定的 CompletablFuture 完成时,返回一个新的 CompletableFuture。

    3.5 结果处理

    whenComplete:当任务完成时,将使用结果(或null)和此阶段的异常(或 null如果没有)执行给定操作。

    exceptionally:返回一个新的 CompletableFuture,当前面的 CompletableFuture 完成时,它也完成,当它

    异常完成时,给定函数的异常触发这个 CompletableFuture 的完成。

    3、CompletableFuture(runAsync和supplyAsync)创建异步任务

    CompletableFuture 提供了四个静态方法来创建一个异步操作:

    public static CompletableFuture<Void> runAsync(Runnable runnable)
    
    public static CompletableFuture<Void> runAsync(Runnable runnable, Executor executor)
    
    public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier)
    
    public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier, Executor executor)
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7

    这四个方法的区别:

    • runAsync()Runnable函数式接口类型为参数,没有返回结果,supplyAsync()Supplier函数式接

      口类型为参数,返回结果类型为USupplier接口的get()是有返回值的(会阻塞)。

    • 使用没有指定Executor的方法时,内部使用ForkJoinPool.commonPool()作为它的线程池执行异步代码。

      如果指定线程池,则使用指定的线程池运行。

    • 默认情况下CompletableFuture会使用公共的ForkJoinPool线程池,这个线程池默认创建的线程数是CPU

      的核数(也可以通过JVM option:-Djava.util.concurrent.ForkJoinPool.common.parallelism来设置

      ForkJoinPool线程池的线程数)。如果所有CompletableFuture共享一个线程池,那么一旦有任务执行一

      些很慢的 I/O 操作,就会导致线程池中所有线程都阻塞在 I/O 操作上,从而造成线程饥饿,进而影响整个系统

      的性能。所以,强烈建议你要根据不同的业务类型创建不同的线程池,以避免互相干扰。

    supplyAsync 表示创建带返回值的异步任务的,相当于 ExecutorService submit(Callable task) 方法。

    runAsync 表示创建无返回值的异步任务,相当于 ExecutorService submit(Runnable task)方法。

    这两方法的效果跟 submit 是一样的。

    这两方法各有一个重载版本,可以指定执行异步任务的Executor实现,如果不指定,默认使用

    ForkJoinPool.commonPool(),如果机器是单核的,则默认使用ThreadPerTaskExecutor,该类是一个内部类,每

    次执行execute都会创建一个新线程。

    3.1 runAsync

    runAsync 没有返回值

    package com;
    
    import java.util.ArrayList;
    import java.util.List;
    import java.util.concurrent.CompletableFuture;
    
    public class RunAsyncTest {
    
        public static void main(String[] args) {
            List<Integer> numberList = new ArrayList<>();
            for (int i = 1; i < 11; i++) {
                numberList.add(i);
            }
            System.out.println("start!");
            long start = System.currentTimeMillis();
            for (Integer number : numberList) {
                CompletableFuture.runAsync(() -> {
                    try {
                        Thread.sleep(1000);
                        System.out.println(number);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                });
            }
            long end = System.currentTimeMillis();
            System.out.println("耗时:" + (end - start));
            System.out.println("end!");
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    # 程序输出
    start!
    耗时:44
    end!
    
    • 1
    • 2
    • 3
    • 4
    package com.test;
    
    import java.util.concurrent.CompletableFuture;
    import java.util.concurrent.ExecutionException;
    
    public class Test1 {
    
        public static void main(String[] args) throws ExecutionException, InterruptedException {
            test();
        }
    
        public static void test() throws ExecutionException, InterruptedException {
            // 创建异步执行任务,无返回值
            CompletableFuture cf = CompletableFuture.runAsync(() -> {
                System.out.println(Thread.currentThread() + " start,time->" + System.currentTimeMillis());
                try {
                    Thread.sleep(2000);
                } catch (InterruptedException e) {
                }
                if (false) {
                    throw new RuntimeException("test");
                } else {
                    System.out.println(Thread.currentThread() + " exit,time->" + System.currentTimeMillis());
                }
            });
            System.out.println("main thread start,time->" + System.currentTimeMillis());
            //等待子任务执行完成
            System.out.println("run result->" + cf.get());
            System.out.println("main thread exit,time->" + System.currentTimeMillis());
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    # 程序输出
    main thread start,time->1693021661122
    Thread[ForkJoinPool.commonPool-worker-1,5,main] start,time->1693021661122
    Thread[ForkJoinPool.commonPool-worker-1,5,main] exit,time->1693021663123
    run result->null
    main thread exit,time->1693021663123
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    package com.test;
    
    import java.util.concurrent.CompletableFuture;
    import java.util.concurrent.ExecutionException;
    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.Executors;
    
    /**
     * 自定义线程池
     */
    public class Test3 {
    
        public static void main(String[] args) throws ExecutionException, InterruptedException {
            test();
        }
    
        public static void test() throws ExecutionException, InterruptedException {
            ExecutorService executorService = Executors.newSingleThreadExecutor();
            // 创建异步执行任务:
            CompletableFuture cf = CompletableFuture.runAsync(() -> {
                System.out.println(Thread.currentThread() + " start,time->" + System.currentTimeMillis());
                try {
                    Thread.sleep(2000);
                } catch (InterruptedException e) {
                }
                if (false) {
                    throw new RuntimeException("test");
                } else {
                    System.out.println(Thread.currentThread() + " exit,time->" + System.currentTimeMillis());
                }
            }, executorService);
            System.out.println("main thread start,time->" + System.currentTimeMillis());
            //等待子任务执行完成
            System.out.println("run result->" + cf.get());
            System.out.println("main thread exit,time->" + System.currentTimeMillis());
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    # 程序输出
    main thread start,time->1693022272784
    Thread[pool-1-thread-1,5,main] start,time->1693022272784
    Thread[pool-1-thread-1,5,main] exit,time->1693022274784
    run result->null
    main thread exit,time->1693022274784
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6

    3.2 supplyAsync

    supplyAsync 有返回值

    package com;
    
    import java.util.ArrayList;
    import java.util.List;
    import java.util.concurrent.CompletableFuture;
    import java.util.concurrent.ExecutionException;
    
    public class SupplyAsynctest {
    
        public static void main(String[] args) {
            List<Integer> numberList = new ArrayList<>();
            for (int i = 1; i < 11; i++) {
                numberList.add(i);
            }
            List<CompletableFuture<Integer>> futureList = new ArrayList<>();
            System.out.println("start!");
            long start = System.currentTimeMillis();
            for (Integer number : numberList) {
                futureList.add(CompletableFuture.supplyAsync(() -> {
                    try {
                        Thread.sleep(1000);
                        return number;
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                        return 0;
                    }
                }));
            }
    
            for (CompletableFuture<Integer> completableFuture : futureList) {
                Integer number = null;
                try {
                    number = completableFuture.get();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } catch (ExecutionException e) {
                    e.printStackTrace();
                }
                System.out.println(number);
            }
            long end = System.currentTimeMillis();
            System.out.println("耗时:" + (end - start));
            System.out.println("end!");
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    # 程序输出
    start!
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    耗时:2047
    end!
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    package com.test;
    
    import java.util.concurrent.CompletableFuture;
    import java.util.concurrent.ExecutionException;
    
    public class Test2 {
    
        public static void main(String[] args) throws ExecutionException, InterruptedException {
            test();
        }
    
        public static void test() throws ExecutionException, InterruptedException {
            // 创建异步执行任务,有返回值
            CompletableFuture<Double> cf = CompletableFuture.supplyAsync(() -> {
                System.out.println(Thread.currentThread() + " start,time->" + System.currentTimeMillis());
                try {
                    Thread.sleep(2000);
                } catch (InterruptedException e) {
                }
                if (false) {
                    throw new RuntimeException("test");
                } else {
                    System.out.println(Thread.currentThread() + " exit,time->" + System.currentTimeMillis());
                    return 1.2;
                }
            });
            System.out.println("main thread start,time->" + System.currentTimeMillis());
            //等待子任务执行完成
            System.out.println("run result->" + cf.get());
            System.out.println("main thread exit,time->" + System.currentTimeMillis());
        }
    
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    # 程序输出
    main thread start,time->1693021915960
    Thread[ForkJoinPool.commonPool-worker-1,5,main] start,time->1693021915960
    Thread[ForkJoinPool.commonPool-worker-1,5,main] exit,time->1693021917960
    run result->1.2
    main thread exit,time->1693021917962
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    package com.test;
    
    import java.util.concurrent.CompletableFuture;
    import java.util.concurrent.ExecutionException;
    import java.util.concurrent.ForkJoinPool;
    
    public class Test4 {
    
        public static void main(String[] args) throws ExecutionException, InterruptedException {
            test();
        }
    
        public static void test() throws ExecutionException, InterruptedException {
            ForkJoinPool pool = new ForkJoinPool();
            // 创建异步执行任务:
            CompletableFuture<Double> cf = CompletableFuture.supplyAsync(() -> {
                System.out.println(Thread.currentThread() + " start,time->" + System.currentTimeMillis());
                try {
                    Thread.sleep(2000);
                } catch (InterruptedException e) {
                }
                if (false) {
                    throw new RuntimeException("test");
                } else {
                    System.out.println(Thread.currentThread() + " exit,time->" + System.currentTimeMillis());
                    return 1.2;
                }
            }, pool);
            System.out.println("main thread start,time->" + System.currentTimeMillis());
            //等待子任务执行完成
            System.out.println("run result->" + cf.get());
            System.out.println("main thread exit,time->" + System.currentTimeMillis());
        }
    
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    # 程序输出
    main thread start,time->1693022336576
    Thread[ForkJoinPool-1-worker-1,5,main] start,time->1693022336576
    Thread[ForkJoinPool-1-worker-1,5,main] exit,time->1693022338577
    run result->1.2
    main thread exit,time->1693022338578
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6

    4、获取结果(get和join)

    join()get()方法都是用来获取CompletableFuture异步之后的返回值。join()方法抛出的是uncheck

    常(即未经检查的异常),不会强制开发者抛出。get()方法抛出的是经过检查的异常,ExecutionException

    InterruptedException 需要用户手动处理(抛出或者 try catch)。

    5、结果处理( whenComplete和exceptionally和handle)

    5.1 whenComplete和exceptionally方法

    CompletableFuture的计算结果完成,或者抛出异常的时候,我们可以执行特定的Action。主要是下面的方

    法:

    public CompletableFuture<T> whenComplete(BiConsumer<? super T,? super Throwable> action)
    
    public CompletableFuture<T> whenCompleteAsync(BiConsumer<? super T,? super Throwable> action)
    
    public CompletableFuture<T> whenCompleteAsync(BiConsumer<? super T,? super Throwable> action, Executor executor)
    
    public CompletableFuture<T> exceptionally(Function<Throwable,? extends T> fn);
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • whenComplete可以处理正常和异常的计算结果,exceptionally处理异常情况。

    • Action的类型是BiConsumer,它可以处理正常的计算结果,或者异

      常情况。

    • 方法不以Async结尾,意味着Action使用相同的线程执行,而Async可能会使用其它的线程去执行(如果使用

      相同的线程池,也可能会被同一个线程选中执行)。

    • 这几个方法都会返回CompletableFuture,当Action执行完毕后它的结果返回原始的CompletableFuture

      的计算结果或者返回异常。

    • whenCompletewhenCompleteAsync 的区别:whenComplete是执行当前任务的线程继续执行

      whenComplete的任务。whenCompleteAsync是执行把whenCompleteAsync 这个任务继续提交给线程池来

      进行执行。

    whenComplete 是当某个任务执行完成后执行的回调方法,会将执行结果或者执行期间抛出的异常传递给回调方

    法,如果是正常执行则异常为null,回调方法对应的CompletableFuture的result和该任务一致,如果该任务正常

    执行,则get方法返回执行结果,如果是执行异常,则get方法抛出异常。

    exceptionally方法指定某个任务执行异常时执行的回调方法,会将抛出异常作为参数传递到回调方法中,如果该

    任务正常执行则会exceptionally方法返回的CompletionStage的result就是该任务正常执行的结果。

    package com;
    
    import java.util.Random;
    import java.util.concurrent.CompletableFuture;
    import java.util.concurrent.ExecutionException;
    import java.util.concurrent.TimeUnit;
    import java.util.function.BiConsumer;
    import java.util.function.Function;
    
    public class WhenCompleteAndExceptionally {
    
        public static void main(String[] args) {
            CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {
                try {
                    TimeUnit.SECONDS.sleep(1);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                Integer randomNum = new Random().nextInt(10);
                if (randomNum % 2 == 0) {
                    System.out.println("任务发生异常,返回给exceptionally!");
                    Integer num = 12 / 0;
                }
                System.out.println("任务处理完成,返回给whenComplete!");
                return randomNum;
            });
    
            // 任务完成或异常方法完成时执行(whenComplete),无论有异常与否,这个方法都会执行
            // 如果出现了异常,任务结果为null
            future.whenComplete(new BiConsumer<Integer, Throwable>() {
                @Override
                public void accept(Integer t, Throwable action) {
                    System.out.println("任务正常,接收supplyAsync的返回值," + "结果是:" + t);
                }
            });
    
            // 出现异常时先执行(exceptionally),然后再执行(whenComplete)
            // 如果没有出现异常,(exceptionally)不会被执行
            future.exceptionally(new Function<Throwable, Integer>() {
                @Override
                public Integer apply(Throwable t) {
                    System.out.println("任务异常,接收supplyAsync的返回值,异常是:" + t.getMessage());
                    return -1;
                }
            });
            // 如果发生了异常,此处无法获取返回值
            try {
                Integer result = future.get();
                System.out.println("get()获取到的结果是:" + result);
            } catch (InterruptedException e) {
                e.printStackTrace();
            } catch (ExecutionException e) {
                e.printStackTrace();
            }
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    # 程序输出
    # 无异常 supplyAsync->whenComplete->get()
    任务处理完成,返回给whenComplete!
    任务正常,接收supplyAsync的返回值,结果是:9
    get()获取到的结果是:9
    
    • 1
    • 2
    • 3
    • 4
    • 5
    # 程序输出
    # 出现异常 supplyAsync->exceptionally->whenComplete
    任务发生异常,返回给exceptionally!
    任务异常,接收supplyAsync的返回值,异常是:java.lang.ArithmeticException: / by zero
    任务正常,接收supplyAsync的返回值,结果是:null
    java.util.concurrent.ExecutionException: java.lang.ArithmeticException: / by zero
    	at java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
    	at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1895)
    	at com.WhenCompleteAndExceptionally.main(WhenCompleteAndExceptionally.java:48)
    Caused by: java.lang.ArithmeticException: / by zero
    	at com.WhenCompleteAndExceptionally.lambda$main$0(WhenCompleteAndExceptionally.java:22)
    	at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1590)
    	at java.util.concurrent.CompletableFuture$AsyncSupply.exec(CompletableFuture.java:1582)
    	at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289)
    	at java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056)
    	at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692)
    	at java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:157)
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17

    另一种写法:

    package com;
    
    import java.util.Random;
    import java.util.concurrent.CompletableFuture;
    import java.util.concurrent.ExecutionException;
    import java.util.concurrent.TimeUnit;
    import java.util.function.BiConsumer;
    import java.util.function.Function;
    
    public class WhenCompleteAndExceptionally2 {
    
        public static void main(String[] args) {
            CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {
                        try {
                            TimeUnit.SECONDS.sleep(1);
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                        Integer randomNum = new Random().nextInt(10);
                        if (randomNum % 2 == 0) {
                            System.out.println("任务发生异常,返回给exceptionally!");
                            Integer num = 12 / 0;
                        }
                        System.out.println("任务处理完成,返回给whenComplete!");
                        return randomNum;
                    })
    
                    // 任务完成或异常方法完成时执行(whenComplete),无论有异常与否,这个方法都会执行
                    // 如果出现了异常,任务结果为null
                    .whenComplete(new BiConsumer<Integer, Throwable>() {
                        @Override
                        public void accept(Integer t, Throwable action) {
                            System.out.println("任务正常,接收supplyAsync的返回值," + "结果是:" + t);
                        }
                    })
    
                    // 出现异常时先执行(exceptionally),然后再执行(whenComplete)
                    // 如果没有出现异常,(exceptionally)不会被执行
                    .exceptionally(new Function<Throwable, Integer>() {
                        @Override
                        public Integer apply(Throwable t) {
                            System.out.println("任务异常,接收supplyAsync的返回值,异常是:" + t.getMessage());
                            return -1;
                        }
                    });
            // 如果发生了异常,此处无法获取返回值
            try {
                Integer result = future.get();
                System.out.println("get()获取到的结果是:" + result);
            } catch (InterruptedException e) {
                e.printStackTrace();
            } catch (ExecutionException e) {
                e.printStackTrace();
            }
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    # 程序输出
    # 无异常 supplyAsync->whenComplete->get()
    任务处理完成,返回给whenComplete!
    任务正常,接收supplyAsync的返回值,结果是:7
    get()获取到的结果是:7
    
    • 1
    • 2
    • 3
    • 4
    • 5
    # 程序输出
    # 有异常 supplyAsync->whenComplete->exceptionally->get()
    任务发生异常,返回给exceptionally!
    任务正常,接收supplyAsync的返回值,结果是:null
    任务异常,接收supplyAsync的返回值,异常是:java.lang.ArithmeticException: / by zero
    get()获取到的结果是:-1
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    package com;
    
    import java.util.Random;
    import java.util.concurrent.CompletableFuture;
    import java.util.concurrent.ExecutionException;
    import java.util.concurrent.TimeUnit;
    import java.util.function.BiConsumer;
    import java.util.function.Function;
    
    public class WhenCompleteAndExceptionally3 {
    
        public static void main(String[] args) {
            CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {
                try {
                    TimeUnit.SECONDS.sleep(1);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                Integer randomNum = new Random().nextInt(10);
                if (randomNum % 2 == 0) {
                    System.out.println("任务发生异常,返回给exceptionally!");
                    Integer num = 12 / 0;
                }
                System.out.println("任务处理完成,返回给whenComplete!");
                return randomNum;
            });
    
            // 任务完成或异常方法完成时执行(whenComplete),无论有异常与否,这个方法都会执行
            // 如果出现了异常,任务结果为null
            CompletableFuture<Integer> future1 = future.whenComplete(new BiConsumer<Integer, Throwable>() {
                @Override
                public void accept(Integer t, Throwable action) {
                    System.out.println("任务正常,接收supplyAsync的返回值," + "结果是:" + t);
                }
            });
    
            // 出现异常时先执行(exceptionally),然后再执行(whenComplete)
            // 如果没有出现异常,(exceptionally)不会被执行
            CompletableFuture<Integer> future2 = future.exceptionally(new Function<Throwable, Integer>() {
                @Override
                public Integer apply(Throwable t) {
                    System.out.println("任务异常,接收supplyAsync的返回值,异常是:" + t.getMessage());
                    return -1;
                }
            });
            // 如果发生了异常,此处无法获取返回值
            try {
                Integer result = future2.get();
                System.out.println("get()获取到的结果是:" + result);
            } catch (InterruptedException e) {
                e.printStackTrace();
            } catch (ExecutionException e) {
                e.printStackTrace();
            }
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    # 程序输出
    # 无异常 supplyAsync->whenComplete->get()
    任务处理完成,返回给whenComplete!
    任务正常,接收supplyAsync的返回值,结果是:3
    get()获取到的结果是:3
    
    • 1
    • 2
    • 3
    • 4
    • 5
    # 程序输出
    # 有异常 supplyAsync->whenComplete->exceptionally->get()
    任务发生异常,返回给exceptionally!
    任务异常,接收supplyAsync的返回值,异常是:java.lang.ArithmeticException: / by zero
    任务正常,接收supplyAsync的返回值,结果是:null
    get()获取到的结果是:-1
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    package com;
    
    import java.util.concurrent.CompletableFuture;
    import java.util.concurrent.ExecutionException;
    import java.util.function.BiConsumer;
    import java.util.function.Function;
    import java.util.function.Supplier;
    
    public class WhenCompleteAndExceptionally4 {
        public static void main(String[] args) {
            CompletableFuture future = CompletableFuture.supplyAsync(new Supplier<Object>() {
                @Override
                public Object get() {
                    System.out.println(Thread.currentThread().getName() + " completableFuture");
                    int i = 10 / 0;
                    return 1024;
                }
            });
            CompletableFuture future1 = future.whenComplete(new BiConsumer<Object, Throwable>() {
                @Override
                public void accept(Object o, Throwable throwable) {
                    System.out.println("-------O=" + o);
                    System.out.println("-------throwable=" + throwable);
                }
            });
            CompletableFuture future2 = future.exceptionally(new Function<Throwable, Object>() {
                @Override
                public Object apply(Throwable throwable) {
                    System.out.println("throwable=" + throwable);
                    return 6666;
                }
            });
            try {
                System.out.println("结果是:" + future2.get());
            } catch (InterruptedException e) {
                e.printStackTrace();
            } catch (ExecutionException e) {
                e.printStackTrace();
            }
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    # 程序输出
    # 有异常 supplyAsync->whenComplete->exceptionally->get()
    ForkJoinPool.commonPool-worker-1 completableFuture
    -------O=null
    -------throwable=java.util.concurrent.CompletionException: java.lang.ArithmeticException: / by zero
    throwable=java.util.concurrent.CompletionException: java.lang.ArithmeticException: / by zero
    结果是:6666
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    package com.test;
    
    import java.util.concurrent.CompletableFuture;
    import java.util.concurrent.ExecutionException;
    
    public class Test5 {
    
        public static void main(String[] args) throws ExecutionException, InterruptedException {
            test();
        }
    
        public static void test() throws ExecutionException, InterruptedException {
            // 创建异步执行任务:
            CompletableFuture<Double> cf = CompletableFuture.supplyAsync(() -> {
                System.out.println(Thread.currentThread() + "job1 start,time->" + System.currentTimeMillis());
                try {
                    Thread.sleep(2000);
                } catch (InterruptedException e) {
                }
                if (false) {
                    throw new RuntimeException("test");
                } else {
                    System.out.println(Thread.currentThread() + "job1 exit,time->" + System.currentTimeMillis());
                    return 1.2;
                }
            });
            //cf执行完成后会将执行结果和执行过程中抛出的异常传入回调方法,如果是正常执行的则传入的异常为null
            CompletableFuture<Double> cf2 = cf.whenComplete((a, b) -> {
                System.out.println(Thread.currentThread() + "job2 start,time->" + System.currentTimeMillis());
                try {
                    Thread.sleep(2000);
                } catch (InterruptedException e) {
                }
                if (b != null) {
                    System.out.println("error stack trace->");
                    b.printStackTrace();
                } else {
                    System.out.println("run succ,result->" + a);
                }
                System.out.println(Thread.currentThread() + "job2 exit,time->" + System.currentTimeMillis());
            });
            //等待子任务执行完成
            System.out.println("main thread start wait,time->" + System.currentTimeMillis());
            //如果cf是正常执行的,cf2.get的结果就是cf执行的结果
            //如果cf是执行异常,则cf2.get会抛出异常
            System.out.println("run result->" + cf2.get());
            System.out.println("main thread exit,time->" + System.currentTimeMillis());
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    # 程序输出
    Thread[ForkJoinPool.commonPool-worker-1,5,main]job1 start,time->1693027350240
    main thread start wait,time->1693027350240
    Thread[ForkJoinPool.commonPool-worker-1,5,main]job1 exit,time->1693027352241
    Thread[ForkJoinPool.commonPool-worker-1,5,main]job2 start,time->1693027352241
    run succ,result->1.2
    Thread[ForkJoinPool.commonPool-worker-1,5,main]job2 exit,time->1693027354241
    run result->1.2
    main thread exit,time->1693027354241
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9

    将上述示例中的if(false) 改成if(true),其输出如下:

    # 程序输出
    Thread[ForkJoinPool.commonPool-worker-1,5,main]job1 start,time->1693027394831
    main thread start wait,time->1693027394832
    Thread[ForkJoinPool.commonPool-worker-1,5,main]job2 start,time->1693027396832
    error stack trace->
    Thread[ForkJoinPool.commonPool-worker-1,5,main]job2 exit,time->1693027398834
    java.util.concurrent.CompletionException: java.lang.RuntimeException: test
    	at java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:273)
    	at java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:280)
    	at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1592)
    	at java.util.concurrent.CompletableFuture$AsyncSupply.exec(CompletableFuture.java:1582)
    	at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289)
    	at java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056)
    	at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692)
    	at java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:157)
    Caused by: java.lang.RuntimeException: test
    	at com.test.Test5.lambda$test$0(Test5.java:21)
    	at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1590)
    	... 5 more
    Exception in thread "main" java.util.concurrent.ExecutionException: java.lang.RuntimeException: test
    	at java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
    	at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1895)
    	at com.test.Test5.test(Test5.java:46)
    	at com.test.Test5.main(Test5.java:9)
    Caused by: java.lang.RuntimeException: test
    	at com.test.Test5.lambda$test$0(Test5.java:21)
    	at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1590)
    	at java.util.concurrent.CompletableFuture$AsyncSupply.exec(CompletableFuture.java:1582)
    	at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289)
    	at java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056)
    	at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692)
    	at java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:157)
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    package com.test;
    
    import java.util.concurrent.CompletableFuture;
    import java.util.concurrent.ExecutionException;
    import java.util.concurrent.ForkJoinPool;
    
    public class Test6 {
    
        public static void main(String[] args) throws ExecutionException, InterruptedException {
            test();
        }
    
        public static void test() throws ExecutionException, InterruptedException {
            ForkJoinPool pool = new ForkJoinPool();
            // 创建异步执行任务:
            CompletableFuture<Double> cf = CompletableFuture.supplyAsync(() -> {
                System.out.println(Thread.currentThread() + "job1 start,time->" + System.currentTimeMillis());
                try {
                    Thread.sleep(2000);
                } catch (InterruptedException e) {
                }
                if (false) {
                    throw new RuntimeException("test");
                } else {
                    System.out.println(Thread.currentThread() + "job1 exit,time->" + System.currentTimeMillis());
                    return 1.2;
                }
            }, pool);
            //cf执行异常时,将抛出的异常作为入参传递给回调方法
            CompletableFuture<Double> cf2 = cf.exceptionally((param) -> {
                System.out.println(Thread.currentThread() + " start,time->" + System.currentTimeMillis());
                try {
                    Thread.sleep(2000);
                } catch (InterruptedException e) {
                }
                System.out.println("error stack trace->");
                param.printStackTrace();
                System.out.println(Thread.currentThread() + " exit,time->" + System.currentTimeMillis());
                return -1.1;
            });
            System.out.println("main thread start,time->" + System.currentTimeMillis());
            //等待子任务执行完成,此处无论是job2和job3都可以实现job2退出,主线程才退出,如果是cf,则主线程不会等待job2执行完成自动退出了
            //cf2.get时,没有异常,但是依然有返回值,就是cf的返回值
            System.out.println("run result->" + cf2.get());
            System.out.println("main thread exit,time->" + System.currentTimeMillis());
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    # 程序输出
    Thread[ForkJoinPool-1-worker-1,5,main]job1 start,time->1693038412532
    main thread start,time->1693038412532
    Thread[ForkJoinPool-1-worker-1,5,main]job1 exit,time->1693038414533
    run result->1.2
    main thread exit,time->1693038414534
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6

    将上述示例中的if(true) 改成if(false),其输出如下:

    # 程序输出
    Thread[ForkJoinPool-1-worker-1,5,main]job1 start,time->1693038448098
    main thread start,time->1693038448098
    Thread[ForkJoinPool-1-worker-1,5,main] start,time->1693038450099
    error stack trace->
    Thread[ForkJoinPool-1-worker-1,5,main] exit,time->1693038452104
    run result->-1.1
    main thread exit,time->1693038452107
    java.util.concurrent.CompletionException: java.lang.RuntimeException: test
    	at java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:273)
    	at java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:280)
    	at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1592)
    	at java.util.concurrent.CompletableFuture$AsyncSupply.exec(CompletableFuture.java:1582)
    	at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289)
    	at java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056)
    	at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692)
    	at java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:157)
    Caused by: java.lang.RuntimeException: test
    	at com.test.Test6.lambda$test$0(Test6.java:23)
    	at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1590)
    	... 5 more
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21

    5.2 handle 方法

    public <U> CompletionStage<U> handle(BiFunction<? super T, Throwable, ? extends U> fn);
    public <U> CompletionStage<U> handleAsync(BiFunction<? super T, Throwable, ? extends U> fn);
    public <U> CompletionStage<U> handleAsync(BiFunction<? super T, Throwable, ? extends U> fn,Executor executor);
    
    • 1
    • 2
    • 3

    handle是执行任务完成时对结果的处理,handle是在任务完成后再执行,还可以处理异常的任务。

    handleAsync方法即可以获取执行结果,也可以感知异常信息,并能处理执行结果并返回。

    跟whenComplete基本一致,区别在于handle的回调方法有返回值,且handle方法返回的CompletableFuture的

    result是回调方法的执行结果或者回调方法执行期间抛出的异常,与原始CompletableFuture的result无关了。

    package com;
    
    import java.util.concurrent.CompletableFuture;
    import java.util.concurrent.ExecutionException;
    
    public class handleAsyncTest {
    
        public static void main(String[] args) {
            System.out.println("main start ...");
            CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {
                System.out.println("开启异步任务...");
                int i = 10 % 2;
                if (i == 0) {
                    throw new RuntimeException("远程服务调用失败");
                }
                return i;
            }).handleAsync((res, thr) -> {
                System.out.println("进入handleAsync方法");
                if (res != null) {
                    return res * 2;
                }
                if (thr != null) {
                    System.out.println("捕获到异常:" + thr);
                    return 0;
                }
                return 10;
            });
            try {
                Integer result = future.get();
                System.out.println("获取异步任务返回值:" + result);
            } catch (InterruptedException e) {
                e.printStackTrace();
            } catch (ExecutionException e) {
                e.printStackTrace();
            }
            System.out.println("main end ...");
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    # 程序输出
    # 有异常
    main start ...
    开启异步任务...
    进入handleAsync方法
    捕获到异常:java.util.concurrent.CompletionException: java.lang.RuntimeException: 远程服务调用失败
    获取异步任务返回值:0
    main end ...
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    # 程序输出
    # 无异常
    main start ...
    开启异步任务...
    进入handleAsync方法
    获取异步任务返回值:2
    main end ...
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    package com;
    
    import java.util.concurrent.CompletableFuture;
    import java.util.concurrent.CountDownLatch;
    import java.util.concurrent.ExecutionException;
    import java.util.concurrent.TimeUnit;
    
    public class handleAsyncTest2 {
    
        public static void main(String[] args) {
            CountDownLatch countDownLatch = new CountDownLatch(2);
            System.out.println("start!");
            CompletableFuture<Integer> future1 = CompletableFuture.supplyAsync(() -> {
                System.out.println(Thread.currentThread().getName() + " 进行一连串操作1....");
                try {
                    TimeUnit.SECONDS.sleep(3);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                return 1;
            });
    
            // whenComplete方法,返回的future的结果还是1
            CompletableFuture<Integer> future = future1.whenComplete((x, y) -> {
                try {
                    TimeUnit.SECONDS.sleep(3);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println(Thread.currentThread().getName() + " whenComplete,future返回:" + x);
            });
    
            // handler返回的future结果是字符串"2"
            CompletableFuture<String> handle = future.handle((x, y) -> {
                System.out.println(Thread.currentThread().getName() + " handle接收的结果:" + x);
                countDownLatch.countDown();
                return "2";
            });
    
            CompletableFuture<Integer> handle1 = handle.handle((x, y) -> {
                System.out.println(Thread.currentThread().getName() + " handle返回的结果:" + x);
                countDownLatch.countDown();
                return 2;
            });
    
            try {
                countDownLatch.await();
                System.out.println("接收到的返回值为:"+handle1.get());
            } catch (InterruptedException | ExecutionException e) {
                e.printStackTrace();
            }
            System.out.println("end!");
        }
    }
    package com;
    
    import java.util.concurrent.CompletableFuture;
    import java.util.concurrent.CountDownLatch;
    import java.util.concurrent.ExecutionException;
    import java.util.concurrent.TimeUnit;
    
    public class handleAsyncTest2 {
    
        public static void main(String[] args) {
            CountDownLatch countDownLatch = new CountDownLatch(2);
            System.out.println("start!");
            CompletableFuture<Integer> future1 = CompletableFuture.supplyAsync(() -> {
                System.out.println(Thread.currentThread().getName() + " 进行一连串操作1....");
                try {
                    TimeUnit.SECONDS.sleep(3);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                return 1;
            });
    
            // whenComplete方法,返回的future的结果还是1
            CompletableFuture<Integer> future = future1.whenComplete((x, y) -> {
                try {
                    TimeUnit.SECONDS.sleep(3);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println(Thread.currentThread().getName() + " whenComplete,future返回:" + x);
            });
    
            // handler返回的future结果是字符串"2"
            CompletableFuture<String> handle = future.handle((x, y) -> {
                System.out.println(Thread.currentThread().getName() + " handle接收的结果:" + x);
                countDownLatch.countDown();
                return "2";
            });
    
            CompletableFuture<Integer> handle1 = handle.handle((x, y) -> {
                System.out.println(Thread.currentThread().getName() + " handle返回的结果:" + x);
                countDownLatch.countDown();
                return 2;
            });
    
            try {
                countDownLatch.await();
                System.out.println("接收到的返回值为:"+handle1.get());
            } catch (InterruptedException | ExecutionException e) {
                e.printStackTrace();
            }
            System.out.println("end!");
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89
    • 90
    • 91
    • 92
    • 93
    • 94
    • 95
    • 96
    • 97
    • 98
    • 99
    • 100
    • 101
    • 102
    • 103
    • 104
    • 105
    • 106
    • 107
    • 108
    # 程序输出
    start!
    ForkJoinPool.commonPool-worker-1 进行一连串操作1....
    ForkJoinPool.commonPool-worker-1 whenComplete,future返回:1
    ForkJoinPool.commonPool-worker-1 handle接收的结果:1
    ForkJoinPool.commonPool-worker-1 handle返回的结果:2
    接收到的返回值为:2
    end!
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    package com.test;
    
    import java.util.concurrent.CompletableFuture;
    import java.util.concurrent.ExecutionException;
    
    public class Test7 {
    
        public static void main(String[] args) throws ExecutionException, InterruptedException {
            test();
        }
    
        public static void test() throws ExecutionException, InterruptedException {
            // 创建异步执行任务:
            CompletableFuture<Double> cf = CompletableFuture.supplyAsync(() -> {
                System.out.println(Thread.currentThread() + "job1 start,time->" + System.currentTimeMillis());
                try {
                    Thread.sleep(2000);
                } catch (InterruptedException e) {
                }
                if (false) {
                    throw new RuntimeException("test");
                } else {
                    System.out.println(Thread.currentThread() + "job1 exit,time->" + System.currentTimeMillis());
                    return 1.2;
                }
            });
            //cf执行完成后会将执行结果和执行过程中抛出的异常传入回调方法,如果是正常执行的则传入的异常为null
            CompletableFuture<String> cf2 = cf.handle((a, b) -> {
                System.out.println(Thread.currentThread() + "job2 start,time->" + System.currentTimeMillis());
                try {
                    Thread.sleep(2000);
                } catch (InterruptedException e) {
                }
                if (b != null) {
                    System.out.println("error stack trace->");
                    b.printStackTrace();
                } else {
                    System.out.println("run succ,result->" + a);
                }
                System.out.println(Thread.currentThread() + "job2 exit,time->" + System.currentTimeMillis());
                if (b != null) {
                    return "run error";
                } else {
                    return "run succ";
                }
            });
            //等待子任务执行完成
            System.out.println("main thread start wait,time->" + System.currentTimeMillis());
            //get的结果是cf2的返回值,跟cf没关系了
            System.out.println("run result->" + cf2.get());
            System.out.println("main thread exit,time->" + System.currentTimeMillis());
        }
    
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    # 程序输出
    Thread[ForkJoinPool.commonPool-worker-1,5,main]job1 start,time->1693040276755
    main thread start wait,time->1693040276755
    Thread[ForkJoinPool.commonPool-worker-1,5,main]job1 exit,time->1693040278755
    Thread[ForkJoinPool.commonPool-worker-1,5,main]job2 start,time->1693040278755
    run succ,result->1.2
    Thread[ForkJoinPool.commonPool-worker-1,5,main]job2 exit,time->1693040280757
    run result->run succ
    main thread exit,time->1693040280758
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9

    将上述示例中的if(true) 改成if(false),其输出如下:

    # 程序输出
    Thread[ForkJoinPool.commonPool-worker-1,5,main]job1 start,time->1693040314676
    main thread start wait,time->1693040314677
    Thread[ForkJoinPool.commonPool-worker-1,5,main]job2 start,time->1693040316676
    error stack trace->
    Thread[ForkJoinPool.commonPool-worker-1,5,main]job2 exit,time->1693040318680
    run result->run error
    main thread exit,time->1693040318681
    java.util.concurrent.CompletionException: java.lang.RuntimeException: test
    	at java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:273)
    	at java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:280)
    	at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1592)
    	at java.util.concurrent.CompletableFuture$AsyncSupply.exec(CompletableFuture.java:1582)
    	at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289)
    	at java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056)
    	at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692)
    	at java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:157)
    Caused by: java.lang.RuntimeException: test
    	at com.test.Test7.lambda$test$0(Test7.java:21)
    	at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1590)
    	... 5 more
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21

    6、结果转换(thenApply和thenCompose)

    将上一段任务的执行结果作为下一阶段任务的入参参与重新计算,产生新的结果。

    6.1 thenApply

    thenApply接收一个函数作为参数,使用该函数处理上一个CompletableFuture调用的结果,并返回一个具有处

    理结果的Future对象。

    常用使用:

    public <U> CompletableFuture<U> thenApply(Function<? super T,? extends U> fn)
    
    public <U> CompletableFuture<U> thenApplyAsync(Function<? super T,? extends U> fn)
    
    • 1
    • 2
    • 3

    T:上一个任务返回结果的类型。

    U:当前任务的返回值类型。

    thenApply 表示某个任务执行完成后执行的动作,即回调方法,会将该任务的执行结果即方法返回值作为入参传

    递到回调方法中。

    package com;
    
    import java.util.concurrent.CompletableFuture;
    import java.util.concurrent.ExecutionException;
    
    public class ThenApplyTest {
    
        public static void main(String[] args) {
            CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {
                int result = 100;
                System.out.println("第一次运算:" + result);
                return result;
            }).thenApply(number -> {
                int result = number * 3;
                System.out.println("第二次运算:" + result);
                return result;
            });
    
            try {
                Integer result = future.get();
                System.out.println("结果是:" + result);
            } catch (InterruptedException e) {
                e.printStackTrace();
            } catch (ExecutionException e) {
                e.printStackTrace();
            }
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    # 运行程序
    第一次运算:100
    第二次运算:300
    结果是:300
    
    • 1
    • 2
    • 3
    • 4
    package com.test;
    
    import java.util.concurrent.CompletableFuture;
    import java.util.concurrent.ExecutionException;
    import java.util.concurrent.ForkJoinPool;
    
    public class Test8 {
    
        public static void main(String[] args) throws ExecutionException, InterruptedException {
            test();
        }
    
        public static void test() throws ExecutionException, InterruptedException {
            ForkJoinPool pool = new ForkJoinPool();
            // 创建异步执行任务:
            CompletableFuture<Double> cf = CompletableFuture.supplyAsync(() -> {
                System.out.println(Thread.currentThread() + " start job1,time->" + System.currentTimeMillis());
                try {
                    Thread.sleep(2000);
                } catch (InterruptedException e) {
                }
                System.out.println(Thread.currentThread() + " exit job1,time->" + System.currentTimeMillis());
                return 1.2;
            }, pool);
            //cf关联的异步任务的返回值作为方法入参,传入到thenApply的方法中
            //thenApply这里实际创建了一个新的CompletableFuture实例
            CompletableFuture<String> cf2 = cf.thenApply((result) -> {
                System.out.println(Thread.currentThread() + " start job2,time->" + System.currentTimeMillis());
                try {
                    Thread.sleep(2000);
                } catch (InterruptedException e) {
                }
                System.out.println(Thread.currentThread() + " exit job2,time->" + System.currentTimeMillis());
                return "test:" + result;
            });
            System.out.println("main thread start cf.get(),time->" + System.currentTimeMillis());
            //等待子任务执行完成
            System.out.println("run result->" + cf.get());
            System.out.println("main thread start cf2.get(),time->" + System.currentTimeMillis());
            System.out.println("run result->" + cf2.get());
            System.out.println("main thread exit,time->" + System.currentTimeMillis());
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    # 运行程序
    Thread[ForkJoinPool-1-worker-1,5,main] start job1,time->1693042190558
    main thread start cf.get(),time->1693042190558
    Thread[ForkJoinPool-1-worker-1,5,main] exit job1,time->1693042192558
    Thread[ForkJoinPool-1-worker-1,5,main] start job2,time->1693042192558
    run result->1.2
    main thread start cf2.get(),time->1693042192559
    Thread[ForkJoinPool-1-worker-1,5,main] exit job2,time->1693042194558
    run result->test:1.2
    main thread exit,time->1693042194558
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10

    job1执行结束后,将job1的方法返回值作为入参传递到job2中并立即执行job2。thenApplyAsync与thenApply的

    区别在于,前者是将job2提交到线程池中异步执行,实际执行job2的线程可能是另外一个线程,后者是由执行

    job1的线程立即执行job2,即两个job都是同一个线程执行的。

    package com.test;
    
    import java.util.concurrent.CompletableFuture;
    import java.util.concurrent.ExecutionException;
    import java.util.concurrent.ForkJoinPool;
    
    public class Test9 {
    
        public static void main(String[] args) throws ExecutionException, InterruptedException {
            test();
        }
    
        public static void test() throws ExecutionException, InterruptedException {
            ForkJoinPool pool = new ForkJoinPool();
            // 创建异步执行任务:
            CompletableFuture<Double> cf = CompletableFuture.supplyAsync(() -> {
                System.out.println(Thread.currentThread() + " start job1,time->" + System.currentTimeMillis());
                try {
                    Thread.sleep(2000);
                } catch (InterruptedException e) {
                }
                System.out.println(Thread.currentThread() + " exit job1,time->" + System.currentTimeMillis());
                return 1.2;
            }, pool);
            //cf关联的异步任务的返回值作为方法入参,传入到thenApply的方法中
            //thenApply这里实际创建了一个新的CompletableFuture实例
            CompletableFuture<String> cf2 = cf.thenApplyAsync((result) -> {
                System.out.println(Thread.currentThread() + " start job2,time->" + System.currentTimeMillis());
                try {
                    Thread.sleep(2000);
                } catch (InterruptedException e) {
                }
                System.out.println(Thread.currentThread() + " exit job2,time->" + System.currentTimeMillis());
                return "test:" + result;
            });
            System.out.println("main thread start cf.get(),time->" + System.currentTimeMillis());
            //等待子任务执行完成
            System.out.println("run result->" + cf.get());
            System.out.println("main thread start cf2.get(),time->" + System.currentTimeMillis());
            System.out.println("run result->" + cf2.get());
            System.out.println("main thread exit,time->" + System.currentTimeMillis());
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    # 程序输出
    Thread[ForkJoinPool-1-worker-1,5,main] start job1,time->1693042362021
    main thread start cf.get(),time->1693042362022
    Thread[ForkJoinPool-1-worker-1,5,main] exit job1,time->1693042364022
    Thread[ForkJoinPool.commonPool-worker-1,5,main] start job2,time->1693042364023
    run result->1.2
    main thread start cf2.get(),time->1693042364024
    Thread[ForkJoinPool.commonPool-worker-1,5,main] exit job2,time->1693042366024
    run result->test:1.2
    main thread exit,time->1693042366024
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10

    从输出可知,执行job1和job2是两个不同的线程。

    Executor实现,如果不指定,默认使用ForkJoinPool.commonPool()。 下述的多个方法,每个方法都有两个以

    Async结尾的方法,一个使用默认的Executor实现,一个使用指定的Executor实现,不带Async的方法是由触发该

    任务的线程执行该任务,带Async的方法是由触发该任务的线程将任务提交到线程池,执行任务的线程跟触发任务

    的线程不一定是同一个。

    6.2 thenCompose

    thenCompose的参数为一个返回CompletableFuture实例的函数,该函数的参数是先前计算步骤的结果。

    常用方法:

    public <U> CompletableFuture<U> thenCompose(Function<? super T, ? extends CompletionStage<U>> fn);
    
    public <U> CompletableFuture<U> thenComposeAsync(Function<? super T, ? extends CompletionStage<U>> fn) ;
    
    • 1
    • 2
    • 3

    thenCompose 方法会在某个任务执行完成后,将该任务的执行结果作为方法入参然后执行指定的方法,该方法会

    返回一个新的CompletableFuture实例,如果该CompletableFuture实例的result不为null,则返回一个基于该

    result的新的CompletableFuture实例,然后执行这个新任务。

    package com;
    
    import java.util.Random;
    import java.util.concurrent.CompletableFuture;
    import java.util.concurrent.CompletionStage;
    import java.util.concurrent.ExecutionException;
    import java.util.function.Function;
    import java.util.function.Supplier;
    
    public class ThenComposeTest {
    
        public static void main(String[] args) {
            CompletableFuture<Integer> future = CompletableFuture.supplyAsync(new Supplier<Integer>() {
                @Override
                public Integer get() {
                    int number = new Random().nextInt(30);
                    System.out.println("第一次运算:" + number);
                    return number;
                }
            }).thenCompose(new Function<Integer, CompletionStage<Integer>>() {
                @Override
                public CompletionStage<Integer> apply(Integer param) {
                    return CompletableFuture.supplyAsync(new Supplier<Integer>() {
                        @Override
                        public Integer get() {
                            int number = param * 2;
                            System.out.println("第二次运算:" + number);
                            return number;
                        }
                    });
                }
            });
            try {
                Integer result = future.get();
                System.out.println("结果是:" + result);
            } catch (InterruptedException e) {
                e.printStackTrace();
            } catch (ExecutionException e) {
                e.printStackTrace();
            }
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    # 程序输出
    第一次运算:24
    第二次运算:48
    结果是:48
    
    • 1
    • 2
    • 3
    • 4

    thenApplythenCompose的区别:

    thenApply转换的是泛型中的类型,返回的是同一个CompletableFuture

    thenCompose将内部的CompletableFuture调用展开来并使用上一个CompletableFutre调用的结果在下一步的

    CompletableFuture调用中进行运算,是生成一个新的CompletableFuture

    package com.test;
    
    import java.util.concurrent.CompletableFuture;
    import java.util.concurrent.ExecutionException;
    
    public class Test10 {
    
        public static void main(String[] args) throws ExecutionException, InterruptedException {
            test();
        }
    
        public static void test() throws ExecutionException, InterruptedException {
            // 创建异步执行任务:
            CompletableFuture<Double> cf = CompletableFuture.supplyAsync(() -> {
                System.out.println(Thread.currentThread() + " start job1,time->" + System.currentTimeMillis());
                try {
                    Thread.sleep(2000);
                } catch (InterruptedException e) {
                }
                System.out.println(Thread.currentThread() + " exit job1,time->" + System.currentTimeMillis());
                return 1.2;
            });
            CompletableFuture<String> cf2 = cf.thenCompose((param) -> {
                System.out.println(Thread.currentThread() + " start job2,time->" + System.currentTimeMillis());
                try {
                    Thread.sleep(2000);
                } catch (InterruptedException e) {
                }
                System.out.println(Thread.currentThread() + " exit job2,time->" + System.currentTimeMillis());
                return CompletableFuture.supplyAsync(() -> {
                    System.out.println(Thread.currentThread() + " start job3,time->" + System.currentTimeMillis());
                    try {
                        Thread.sleep(2000);
                    } catch (InterruptedException e) {
                    }
                    System.out.println(Thread.currentThread() + " exit job3,time->" + System.currentTimeMillis());
                    return "job3 test";
                });
            });
            System.out.println("main thread start cf.get(),time->" + System.currentTimeMillis());
            //等待子任务执行完成
            System.out.println("cf run result->" + cf.get());
            System.out.println("main thread start cf2.get(),time->" + System.currentTimeMillis());
            System.out.println("cf2 run result->" + cf2.get());
            System.out.println("main thread exit,time->" + System.currentTimeMillis());
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    # 程序输出
    Thread[ForkJoinPool.commonPool-worker-1,5,main] start job1,time->1693043028531
    main thread start cf.get(),time->1693043028531
    Thread[ForkJoinPool.commonPool-worker-1,5,main] exit job1,time->1693043030531
    Thread[ForkJoinPool.commonPool-worker-1,5,main] start job2,time->1693043030531
    cf run result->1.2
    main thread start cf2.get(),time->1693043030534
    Thread[ForkJoinPool.commonPool-worker-1,5,main] exit job2,time->1693043032532
    Thread[ForkJoinPool.commonPool-worker-1,5,main] start job3,time->1693043032533
    Thread[ForkJoinPool.commonPool-worker-1,5,main] exit job3,time->1693043034534
    cf2 run result->job3 test
    main thread exit,time->1693043034534
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12

    job1执行完成后job2开始执行,等job2执行完成后会把job3返回,然后执行job3,等job3执行完成后,主线程退

    出。

    7、结果消费(thenAccept和thenAcceptBoth和thenRun)

    与结果处理和结果转换系列函数返回一个新的CompletableFuture不同,结果消费系列函数只对结果执行

    Action,而不返回新的计算值。

    根据对结果的处理方式,结果消费函数又可以分为下面三大类:

    thenAccept():对单个结果进行消费。

    thenAcceptBoth():对两个结果进行消费。

    thenRun():不关心结果,只对结果执行Action

    7.1 thenAccept

    观察该系列函数的参数类型可知,它们是函数式接口Consumer,这个接口只有输入,没有返回值。

    thenAccept 同 thenApply 接收上一个任务的返回值作为参数,但是无返回值。

    常用方法:

    public CompletionStage<Void> thenAccept(Consumer<? super T> action);
    
    public CompletionStage<Void> thenAcceptAsync(Consumer<? super T> action);
    
    • 1
    • 2
    • 3
    package com;
    
    import java.util.Random;
    import java.util.concurrent.CompletableFuture;
    
    public class ThenAcceptTest {
    
        public static void main(String[] args) {
            CompletableFuture<Void> future = CompletableFuture.supplyAsync(() -> {
                int number = new Random().nextInt(10);
                System.out.println("第一次运算:" + number);
                return number;
            }).thenAccept(number -> System.out.println("第二次运算:" + number * 5));
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    # 程序输出
    第一次运算:3
    第二次运算:15
    
    • 1
    • 2
    • 3
    package com.test;
    
    import java.util.concurrent.CompletableFuture;
    import java.util.concurrent.ExecutionException;
    import java.util.concurrent.ForkJoinPool;
    
    public class Test11 {
    
        public static void main(String[] args) throws ExecutionException, InterruptedException {
            test();
        }
    
        public static void test() throws ExecutionException, InterruptedException {
            ForkJoinPool pool = new ForkJoinPool();
            // 创建异步执行任务:
            CompletableFuture<Double> cf = CompletableFuture.supplyAsync(() -> {
                System.out.println(Thread.currentThread() + " start job1,time->" + System.currentTimeMillis());
                try {
                    Thread.sleep(2000);
                } catch (InterruptedException e) {
                }
                System.out.println(Thread.currentThread() + " exit job1,time->" + System.currentTimeMillis());
                return 1.2;
            }, pool);
            // cf关联的异步任务的返回值作为方法入参,传入到thenApply的方法中
            CompletableFuture cf2 = cf.thenApply((result) -> {
                System.out.println(Thread.currentThread() + " start job2,time->" + System.currentTimeMillis());
                try {
                    Thread.sleep(2000);
                } catch (InterruptedException e) {
                }
                System.out.println(Thread.currentThread() + " exit job2,time->" + System.currentTimeMillis());
                return "test:" + result;
            }).thenAccept((result) -> {
                //接收上一个任务的执行结果作为入参,但是没有返回值
                System.out.println(Thread.currentThread() + " start job3,time->" + System.currentTimeMillis());
                try {
                    Thread.sleep(2000);
                } catch (InterruptedException e) {
                }
                System.out.println(result);
                System.out.println(Thread.currentThread() + " exit job3,time->" + System.currentTimeMillis());
            }).thenRun(() -> {
                //无入参,也没有返回值
                System.out.println(Thread.currentThread() + " start job4,time->" + System.currentTimeMillis());
                try {
                    Thread.sleep(2000);
                } catch (InterruptedException e) {
                }
                System.out.println("thenRun do something");
                System.out.println(Thread.currentThread() + " exit job4,time->" + System.currentTimeMillis());
            });
            System.out.println("main thread start cf.get(),time->" + System.currentTimeMillis());
            //等待子任务执行完成
            System.out.println("run result->" + cf.get());
            System.out.println("main thread start cf2.get(),time->" + System.currentTimeMillis());
            //cf2 等待最后一个thenRun执行完成
            System.out.println("run result->" + cf2.get());
            System.out.println("main thread exit,time->" + System.currentTimeMillis());
        }
    
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    # 程序输出
    Thread[ForkJoinPool-1-worker-1,5,main] start job1,time->1693043895706
    main thread start cf.get(),time->1693043895707
    Thread[ForkJoinPool-1-worker-1,5,main] exit job1,time->1693043897706
    Thread[ForkJoinPool-1-worker-1,5,main] start job2,time->1693043897706
    run result->1.2
    main thread start cf2.get(),time->1693043897707
    Thread[ForkJoinPool-1-worker-1,5,main] exit job2,time->1693043899706
    Thread[ForkJoinPool-1-worker-1,5,main] start job3,time->1693043899706
    test:1.2
    Thread[ForkJoinPool-1-worker-1,5,main] exit job3,time->1693043901707
    Thread[ForkJoinPool-1-worker-1,5,main] start job4,time->1693043901707
    thenRun do something
    Thread[ForkJoinPool-1-worker-1,5,main] exit job4,time->1693043903707
    run result->null
    main thread exit,time->1693043903707
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    package com.test;
    
    import java.util.concurrent.CompletableFuture;
    import java.util.concurrent.ExecutionException;
    import java.util.concurrent.ForkJoinPool;
    
    public class Test12 {
    
        public static void main(String[] args) throws ExecutionException, InterruptedException {
            test();
        }
    
        public static void test() throws ExecutionException, InterruptedException {
            ForkJoinPool pool = new ForkJoinPool();
            // 创建异步执行任务:
            CompletableFuture<Double> cf = CompletableFuture.supplyAsync(() -> {
                System.out.println(Thread.currentThread() + "job1 start,time->" + System.currentTimeMillis());
                try {
                    Thread.sleep(2000);
                } catch (InterruptedException e) {
                }
                if (true) {
                    throw new RuntimeException("test");
                } else {
                    System.out.println(Thread.currentThread() + "job1 exit,time->" + System.currentTimeMillis());
                    return 1.2;
                }
            }, pool);
            //cf执行异常时,将抛出的异常作为入参传递给回调方法
            CompletableFuture<Double> cf2 = cf.exceptionally((param) -> {
                System.out.println(Thread.currentThread() + " start,time->" + System.currentTimeMillis());
                try {
                    Thread.sleep(2000);
                } catch (InterruptedException e) {
                }
                System.out.println("error stack trace->");
                param.printStackTrace();
                System.out.println(Thread.currentThread() + " exit,time->" + System.currentTimeMillis());
                return -1.1;
            });
            //cf正常执行时执行的逻辑,如果执行异常则不调用此逻辑
            cf2.thenAccept((param) -> {
                System.out.println(Thread.currentThread() + "job2 start,time->" + System.currentTimeMillis());
                try {
                    Thread.sleep(2000);
                } catch (InterruptedException e) {
                }
                System.out.println("param->" + param);
                System.out.println(Thread.currentThread() + "job2 exit,time->" + System.currentTimeMillis());
            });
            System.out.println("main thread start,time->" + System.currentTimeMillis());
            //等待子任务执行完成,此处无论是job2和job3都可以实现job2退出,主线程才退出,如果是cf,则主线程不会等待job2执行完成自动退出了
            //cf2.get时,没有异常,但是依然有返回值,就是cf的返回值
            System.out.println("run result->" + cf2.get());
            System.out.println("main thread exit,time->" + System.currentTimeMillis());
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    # 程序输出
    Thread[ForkJoinPool-1-worker-1,5,main]job1 start,time->1693044222992
    main thread start,time->1693044222993
    Thread[ForkJoinPool-1-worker-1,5,main] start,time->1693044224993
    error stack trace->
    Thread[ForkJoinPool-1-worker-1,5,main] exit,time->1693044226996
    Thread[ForkJoinPool-1-worker-1,5,main]job2 start,time->1693044226996
    java.util.concurrent.CompletionException: java.lang.RuntimeException: test
    	at java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:273)
    	at java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:280)
    	at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1592)
    	at java.util.concurrent.CompletableFuture$AsyncSupply.exec(CompletableFuture.java:1582)
    	at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289)
    	at java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056)
    	at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692)
    	at java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:157)
    Caused by: java.lang.RuntimeException: test
    	at com.test.Test12.lambda$test$0(Test12.java:23)
    	at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1590)
    	... 5 more
    param->-1.1
    Thread[ForkJoinPool-1-worker-1,5,main]job2 exit,time->1693044228998
    run result->-1.1
    main thread exit,time->1693044228999
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24

    将上述示例中的if(true) 改成if(false),其输出如下:

    # 程序输出
    Thread[ForkJoinPool-1-worker-1,5,main]job1 start,time->1693044349555
    main thread start,time->1693044349556
    Thread[ForkJoinPool-1-worker-1,5,main]job1 exit,time->1693044351556
    Thread[ForkJoinPool-1-worker-1,5,main]job2 start,time->1693044351556
    param->1.2
    Thread[ForkJoinPool-1-worker-1,5,main]job2 exit,time->1693044353559
    run result->1.2
    main thread exit,time->1693044353559
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9

    cf2没有指定,其result就是cf执行的结果,理论上cf2.get应该立即返回的,此处是等待了cf3,即job2执行完成后

    才返回。

    7.2 thenAcceptBoth

    thenAcceptBoth函数的作用是,当两个CompletionStage都正常完成计算的时候,就会执行提供的action

    费两个异步的结果。

    常用方法:

    public <U> CompletionStage<Void> thenAcceptBoth(CompletionStage<? extends U> other,BiConsumer<? super T, ? super U> action);
    
    public <U> CompletionStage<Void> thenAcceptBothAsync(CompletionStage<? extends U> other,BiConsumer<? super T, ? super U> action);
    
    • 1
    • 2
    • 3

    thenAcceptBoth 将两个任务的执行结果作为方法入参,但是无返回值。

    package com;
    
    import java.util.Random;
    import java.util.concurrent.CompletableFuture;
    import java.util.concurrent.CountDownLatch;
    import java.util.concurrent.TimeUnit;
    import java.util.function.BiConsumer;
    import java.util.function.Supplier;
    
    public class ThenAcceptBothTest {
    
        public static void main(String[] args) {
            System.out.println("main thread start time->" + System.currentTimeMillis());
            CountDownLatch countDownLatch = new CountDownLatch(1);
            CompletableFuture<Integer> future1 = CompletableFuture.supplyAsync(new Supplier<Integer>() {
                @Override
                public Integer get() {
                    int number = new Random().nextInt(3) + 1;
                    try {
                        TimeUnit.SECONDS.sleep(number);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    System.out.println("任务1结果:" + number);
                    return number;
                }
            });
    
            CompletableFuture<Integer> future2 = CompletableFuture.supplyAsync(new Supplier<Integer>() {
                @Override
                public Integer get() {
                    int number = new Random().nextInt(3) + 1;
                    try {
                        TimeUnit.SECONDS.sleep(number);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    System.out.println("任务2结果:" + number);
                    return number;
                }
            });
    
            future1.thenAcceptBoth(future2, new BiConsumer<Integer, Integer>() {
                @Override
                public void accept(Integer x, Integer y) {
                    System.out.println("最终结果:" + (x + y));
                    countDownLatch.countDown();
                }
            });
            try {
                countDownLatch.await();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("main thread end time->" + System.currentTimeMillis());
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    # 程序输出
    main thread start time->1693053398891
    任务1结果:3
    任务2结果:3
    最终结果:6
    main thread end time->1693053401896
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6

    7.3 thenRun

    thenRun也是对线程任务结果的一种消费函数,与thenAccept不同的是,thenRun会在上一阶段

    CompletableFuture计算完成的时候执行一个Runnable,而Runnable并不使用该CompletableFuture计算的

    结果。

    thenRun 的方法没有入参,也没有返回值。

    常用方法:

    public CompletionStage<Void> thenRun(Runnable action);
    
    public CompletionStage<Void> thenRunAsync(Runnable action);
    
    • 1
    • 2
    • 3
    package com;
    
    import java.util.Random;
    import java.util.concurrent.CompletableFuture;
    
    public class ThenRunTest {
    
        public static void main(String[] args) {
            CompletableFuture<Void> future = CompletableFuture.supplyAsync(() -> {
                int number = new Random().nextInt(10);
                System.out.println("第一阶段:" + number);
                return number;
            }).thenRun(() -> System.out.println("thenRun 执行"));
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    # 程序输出
    第一阶段:8
    thenRun 执行
    
    • 1
    • 2
    • 3

    8、结果组合(thenCombine)

    8.1 thenCombine

    合并两个线程任务的结果,并进一步处理。

    常用方法:

    public <U,V> CompletableFuture<V> thenCombine(CompletionStage<? extends U> other,BiFunction<? super T,? super U,? extends V> fn);
    
    public <U,V> CompletableFuture<V> thenCombineAsync(CompletionStage<? extends U> other,BiFunction<? super T,? super U,? extends V> fn);
    
    public <U,V> CompletableFuture<V> thenCombineAsync(CompletionStage<? extends U> other,BiFunction<? super T,? super U,? extends V> fn, Executor executor);
    
    • 1
    • 2
    • 3
    • 4
    • 5

    thenCombine 会将两个任务的执行结果作为方法入参传递到指定方法中,且该方法有返回值。

    package com;
    
    import java.util.Random;
    import java.util.concurrent.CompletableFuture;
    import java.util.concurrent.ExecutionException;
    import java.util.function.BiFunction;
    import java.util.function.Supplier;
    
    public class ThenCombineTest {
    
        public static void main(String[] args) {
            CompletableFuture<Integer> future1 = CompletableFuture.supplyAsync(new Supplier<Integer>() {
                @Override
                public Integer get() {
                    int number = new Random().nextInt(10);
                    System.out.println("任务1结果:" + number);
                    return number;
                }
            });
            CompletableFuture<Integer> future2 = CompletableFuture.supplyAsync(new Supplier<Integer>() {
                @Override
                public Integer get() {
                    int number = new Random().nextInt(10);
                    System.out.println("任务2结果:" + number);
                    return number;
                }
            });
            CompletableFuture<Integer> result = future1.thenCombine(future2, new BiFunction<Integer, Integer, Integer>() {
                @Override
                public Integer apply(Integer x, Integer y) {
                    return x + y;
                }
            });
            try {
                Integer integer = result.get();
                System.out.println("结果是:" + integer);
            } catch (InterruptedException e) {
                e.printStackTrace();
            } catch (ExecutionException e) {
                e.printStackTrace();
            }
    
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    # 程序输出
    任务1结果:0
    任务2结果:4
    结果是:4
    
    • 1
    • 2
    • 3
    • 4

    thenCombine / thenAcceptBoth / runAfterBoth:

    这三个方法都是将两个CompletableFuture组合起来,只有这两个都正常执行完了才会执行某个任务,区别在于,

    thenCombine会将两个任务的执行结果作为方法入参传递到指定方法中,且该方法有返回值;thenAcceptBoth同

    样将两个任务的执行结果作为方法入参,但是无返回值;runAfterBoth没有入参,也没有返回值。注意两个任务

    中只要有一个执行异常,则将该异常信息作为指定任务的执行结果。

    package com.test;
    
    import java.util.concurrent.CompletableFuture;
    import java.util.concurrent.ExecutionException;
    import java.util.concurrent.ForkJoinPool;
    
    public class Test13 {
    
        public static void main(String[] args) throws ExecutionException, InterruptedException {
            test();
        }
    
        public static void test() throws ExecutionException, InterruptedException {
            ForkJoinPool pool = new ForkJoinPool();
            // 创建异步执行任务:
            CompletableFuture<Double> cf = CompletableFuture.supplyAsync(() -> {
                System.out.println(Thread.currentThread() + " start job1,time->" + System.currentTimeMillis());
                try {
                    Thread.sleep(2000);
                } catch (InterruptedException e) {
                }
                System.out.println(Thread.currentThread() + " exit job1,time->" + System.currentTimeMillis());
                return 1.2;
            });
            CompletableFuture<Double> cf2 = CompletableFuture.supplyAsync(() -> {
                System.out.println(Thread.currentThread() + " start job2,time->" + System.currentTimeMillis());
                try {
                    Thread.sleep(1500);
                } catch (InterruptedException e) {
                }
                System.out.println(Thread.currentThread() + " exit job2,time->" + System.currentTimeMillis());
                return 3.2;
            });
            //cf和cf2的异步任务都执行完成后,会将其执行结果作为方法入参传递给cf3,且有返回值
            CompletableFuture<Double> cf3 = cf.thenCombine(cf2, (a, b) -> {
                System.out.println(Thread.currentThread() + " start job3,time->" + System.currentTimeMillis());
                System.out.println("job3 param a->" + a + ",b->" + b);
                try {
                    Thread.sleep(2000);
                } catch (InterruptedException e) {
                }
                System.out.println(Thread.currentThread() + " exit job3,time->" + System.currentTimeMillis());
                return a + b;
            });
    
            //cf和cf2的异步任务都执行完成后,会将其执行结果作为方法入参传递给cf3,无返回值
            CompletableFuture cf4 = cf.thenAcceptBoth(cf2, (a, b) -> {
                System.out.println(Thread.currentThread() + " start job4,time->" + System.currentTimeMillis());
                System.out.println("job4 param a->" + a + ",b->" + b);
                try {
                    Thread.sleep(1500);
                } catch (InterruptedException e) {
                }
                System.out.println(Thread.currentThread() + " exit job4,time->" + System.currentTimeMillis());
            });
    
            //cf4和cf3都执行完成后,执行cf5,无入参,无返回值
            CompletableFuture cf5 = cf4.runAfterBoth(cf3, () -> {
                System.out.println(Thread.currentThread() + " start job5,time->" + System.currentTimeMillis());
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                }
                System.out.println("cf5 do something");
                System.out.println(Thread.currentThread() + " exit job5,time->" + System.currentTimeMillis());
            });
    
            System.out.println("main thread start cf.get(),time->" + System.currentTimeMillis());
            //等待子任务执行完成
            System.out.println("cf run result->" + cf.get());
            System.out.println("main thread start cf5.get(),time->" + System.currentTimeMillis());
            System.out.println("cf5 run result->" + cf5.get());
            System.out.println("main thread exit,time->" + System.currentTimeMillis());
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    # 程序输出
    Thread[ForkJoinPool.commonPool-worker-1,5,main] start job1,time->1693053679581
    Thread[ForkJoinPool.commonPool-worker-2,5,main] start job2,time->1693053679582
    main thread start cf.get(),time->1693053679583
    Thread[ForkJoinPool.commonPool-worker-2,5,main] exit job2,time->1693053681082
    Thread[ForkJoinPool.commonPool-worker-1,5,main] exit job1,time->1693053681582
    Thread[ForkJoinPool.commonPool-worker-1,5,main] start job4,time->1693053681582
    Thread[main,5,main] start job3,time->1693053681582
    job3 param a->1.2,b->3.2
    job4 param a->1.2,b->3.2
    Thread[ForkJoinPool.commonPool-worker-1,5,main] exit job4,time->1693053683085
    Thread[main,5,main] exit job3,time->1693053683585
    Thread[main,5,main] start job5,time->1693053683585
    cf5 do something
    Thread[main,5,main] exit job5,time->1693053684586
    cf run result->1.2
    main thread start cf5.get(),time->1693053684586
    cf5 run result->null
    main thread exit,time->1693053684586
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19

    job1 和 job2几乎同时运行,job2比job1先执行完成,等job1退出后,job3和job4几乎同时开始运行,job4先退

    出,等job3执行完成,job5开始了,等job5执行完成后,主线程退出。

    9、任务交互(applyToEither和acceptEither和runAfterEither和anyOf和allOf和runAfterBoth)

    线程交互指将两个线程任务获取结果的速度相比较,按一定的规则进行下一步处理。

    9.1 applyToEither

    两个线程任务相比较,先获得执行结果的,就对该结果进行下一步的转化操作。

    常用方法:

    public <U> CompletionStage<U> applyToEither(CompletionStage<? extends T> other,Function<? super T, U> fn);
    
    public <U> CompletionStage<U> applyToEitherAsync(CompletionStage<? extends T> other,Function<? super T, U> fn);
    
    • 1
    • 2
    • 3

    applyToEither 会将已经执行完成的任务的执行结果作为方法入参,并有返回值。

    package com;
    
    import java.util.Random;
    import java.util.concurrent.CompletableFuture;
    import java.util.concurrent.ExecutionException;
    import java.util.concurrent.TimeUnit;
    import java.util.function.Function;
    import java.util.function.Supplier;
    
    public class ApplyToEitherTest {
    
        public static void main(String[] args) {
            CompletableFuture<Integer> future1 = CompletableFuture.supplyAsync(new Supplier<Integer>() {
                @Override
                public Integer get() {
                    int number = new Random().nextInt(10);
                    try {
                        TimeUnit.SECONDS.sleep(number);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    System.out.println("任务1结果:" + number);
                    return number;
                }
            });
            CompletableFuture<Integer> future2 = CompletableFuture.supplyAsync(new Supplier<Integer>() {
                @Override
                public Integer get() {
                    int number = new Random().nextInt(10);
                    try {
                        TimeUnit.SECONDS.sleep(number);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    System.out.println("任务2结果:" + number);
                    return number;
                }
            });
    
            future1.applyToEither(future2, new Function<Integer, Integer>() {
                @Override
                public Integer apply(Integer number) {
                    System.out.println("最快结果:" + number);
                    return number * 2;
                }
            });
    
            try {
                Integer result = future1.get();
                System.out.println("结果是:" + result);
            } catch (InterruptedException e) {
                e.printStackTrace();
            } catch (ExecutionException e) {
                e.printStackTrace();
            }
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    # 程序输出
    任务2结果:4
    最快结果:4
    任务1结果:8
    结果是:8
    
    • 1
    • 2
    • 3
    • 4
    • 5

    9.2 acceptEither

    两个线程任务相比较,先获得执行结果的,就对该结果进行下一步的消费操作。

    常用方法:

    public CompletionStage<Void> acceptEither(CompletionStage<? extends T> other,Consumer<? super T> action);
    
    public CompletionStage<Void> acceptEitherAsync(CompletionStage<? extends T> other,Consumer<? super T> action);
    
    • 1
    • 2
    • 3

    acceptEither同样将已经执行完成的任务的执行结果作为方法入参,但是没有返回值。

    package com;
    
    import java.util.Random;
    import java.util.concurrent.CompletableFuture;
    import java.util.concurrent.CountDownLatch;
    import java.util.concurrent.TimeUnit;
    import java.util.function.Consumer;
    import java.util.function.Supplier;
    
    public class AcceptEitherTest {
    
        public static void main(String[] args) {
            CountDownLatch countDownLatch = new CountDownLatch(1);
            CompletableFuture<Integer> future1 = CompletableFuture.supplyAsync(new Supplier<Integer>() {
                @Override
                public Integer get() {
                    int number = new Random().nextInt(10) + 1;
                    try {
                        TimeUnit.SECONDS.sleep(number);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    System.out.println("第一阶段:" + number);
                    return number;
                }
            });
    
            CompletableFuture<Integer> future2 = CompletableFuture.supplyAsync(new Supplier<Integer>() {
                @Override
                public Integer get() {
                    int number = new Random().nextInt(10) + 1;
                    try {
                        TimeUnit.SECONDS.sleep(number);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    System.out.println("第二阶段:" + number);
                    return number;
                }
            });
    
            future1.acceptEither(future2, new Consumer<Integer>() {
                @Override
                public void accept(Integer number) {
                    System.out.println("最快结果:" + number);
                    countDownLatch.countDown();
                }
            });
            try {
                countDownLatch.await();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    # 程序输出
    第二阶段:2
    最快结果:2
    
    • 1
    • 2
    • 3

    9.3 runAfterEither

    两个线程任务相比较,有任何一个执行完成,就进行下一步操作,不关心运行结果。

    常用方法:

    public CompletionStage<Void> runAfterEither(CompletionStage<?> other,Runnable action);
    
    public CompletionStage<Void> runAfterEitherAsync(CompletionStage<?> other,Runnable action);
    
    • 1
    • 2
    • 3

    runAfterEither没有方法入参,也没有返回值。

    package com;
    
    import java.util.Random;
    import java.util.concurrent.CompletableFuture;
    import java.util.concurrent.TimeUnit;
    import java.util.function.Supplier;
    
    public class RunAfterEitherTest {
    
        public static void main(String[] args) {
            CompletableFuture<Integer> future1 = CompletableFuture.supplyAsync(new Supplier<Integer>() {
                @Override
                public Integer get() {
                    int number = new Random().nextInt(5);
                    try {
                        TimeUnit.SECONDS.sleep(number);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    System.out.println("任务1结果:" + number);
                    return number;
                }
            });
    
            CompletableFuture<Integer> future2 = CompletableFuture.supplyAsync(new Supplier<Integer>() {
                @Override
                public Integer get() {
                    int number = new Random().nextInt(5);
                    try {
                        TimeUnit.SECONDS.sleep(number);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    System.out.println("任务2结果:" + number);
                    return number;
                }
            });
    
            future1.runAfterEither(future2, new Runnable() {
                @Override
                public void run() {
                    System.out.println("已经有一个任务完成了");
                }
            }).join();
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    # 程序输出
    任务2结果:4
    任务1结果:4
    已经有一个任务完成了
    
    • 1
    • 2
    • 3
    • 4

    applyToEither / acceptEither / runAfterEither:

    这三个方法都是将两个CompletableFuture组合起来,只要其中一个执行完了就会执行某个任务,其区别在于

    applyToEither会将已经执行完成的任务的执行结果作为方法入参,并有返回值;acceptEither同样将已经执行完

    成的任务的执行结果作为方法入参,但是没有返回值;runAfterEither没有方法入参,也没有返回值。注意两个任

    务中只要有一个执行异常,则将该异常信息作为指定任务的执行结果。

    package com.test;
    
    import java.util.concurrent.CompletableFuture;
    import java.util.concurrent.ExecutionException;
    
    public class Test14 {
    
        public static void main(String[] args) throws ExecutionException, InterruptedException {
            test();
        }
    
        public static void test() throws ExecutionException, InterruptedException {
            // 创建异步执行任务:
            CompletableFuture<Double> cf = CompletableFuture.supplyAsync(() -> {
                System.out.println(Thread.currentThread() + " start job1,time->" + System.currentTimeMillis());
                try {
                    Thread.sleep(2000);
                } catch (InterruptedException e) {
                }
                System.out.println(Thread.currentThread() + " exit job1,time->" + System.currentTimeMillis());
                return 1.2;
            });
            CompletableFuture<Double> cf2 = CompletableFuture.supplyAsync(() -> {
                System.out.println(Thread.currentThread() + " start job2,time->" + System.currentTimeMillis());
                try {
                    Thread.sleep(1500);
                } catch (InterruptedException e) {
                }
                System.out.println(Thread.currentThread() + " exit job2,time->" + System.currentTimeMillis());
                return 3.2;
            });
            //cf和cf2的异步任务都执行完成后,会将其执行结果作为方法入参传递给cf3,且有返回值
            CompletableFuture<Double> cf3 = cf.applyToEither(cf2, (result) -> {
                System.out.println(Thread.currentThread() + " start job3,time->" + System.currentTimeMillis());
                System.out.println("job3 param result->" + result);
                try {
                    Thread.sleep(2000);
                } catch (InterruptedException e) {
                }
                System.out.println(Thread.currentThread() + " exit job3,time->" + System.currentTimeMillis());
                return result;
            });
    
            //cf和cf2的异步任务都执行完成后,会将其执行结果作为方法入参传递给cf3,无返回值
            CompletableFuture cf4 = cf.acceptEither(cf2, (result) -> {
                System.out.println(Thread.currentThread() + " start job4,time->" + System.currentTimeMillis());
                System.out.println("job4 param result->" + result);
                try {
                    Thread.sleep(1500);
                } catch (InterruptedException e) {
                }
                System.out.println(Thread.currentThread() + " exit job4,time->" + System.currentTimeMillis());
            });
    
            //cf4和cf3都执行完成后,执行cf5,无入参,无返回值
            CompletableFuture cf5 = cf4.runAfterEither(cf3, () -> {
                System.out.println(Thread.currentThread() + " start job5,time->" + System.currentTimeMillis());
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                }
                System.out.println("cf5 do something");
                System.out.println(Thread.currentThread() + " exit job5,time->" + System.currentTimeMillis());
            });
    
            System.out.println("main thread start cf.get(),time->" + System.currentTimeMillis());
            //等待子任务执行完成
            System.out.println("cf run result->" + cf.get());
            System.out.println("main thread start cf5.get(),time->" + System.currentTimeMillis());
            System.out.println("cf5 run result->" + cf5.get());
            System.out.println("main thread exit,time->" + System.currentTimeMillis());
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    # 程序输出
    Thread[ForkJoinPool.commonPool-worker-1,5,main] start job1,time->1693054246333
    Thread[ForkJoinPool.commonPool-worker-2,5,main] start job2,time->1693054246334
    main thread start cf.get(),time->1693054246334
    Thread[ForkJoinPool.commonPool-worker-2,5,main] exit job2,time->1693054247835
    Thread[ForkJoinPool.commonPool-worker-2,5,main] start job4,time->1693054247835
    job4 param result->3.2
    Thread[ForkJoinPool.commonPool-worker-1,5,main] exit job1,time->1693054248335
    Thread[ForkJoinPool.commonPool-worker-1,5,main] start job3,time->1693054248335
    cf run result->1.2
    job3 param result->1.2
    main thread start cf5.get(),time->1693054248335
    Thread[ForkJoinPool.commonPool-worker-2,5,main] exit job4,time->1693054249339
    Thread[ForkJoinPool.commonPool-worker-2,5,main] start job5,time->1693054249339
    Thread[ForkJoinPool.commonPool-worker-1,5,main] exit job3,time->1693054250336
    cf5 do something
    Thread[ForkJoinPool.commonPool-worker-2,5,main] exit job5,time->1693054250339
    cf5 run result->null
    main thread exit,time->1693054250339
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19

    job1 和job2 同时开始运行,job2先执行完成,然后job4开始执行,理论上job3和job4应该同时开始运行,但是此

    时只有job4开始执行了,job3是等待job1执行完成后才开始执行,job4先于job3执行完成,然后job5开始执行,

    等job5执行完成后,主线程退出。

    9.4 anyOf

    anyOf() 的参数是多个给定的 CompletableFuture,当其中的任何一个完成时,方法返回这个

    CompletableFuture

    常用方法:

    public static CompletableFuture<Object> anyOf(CompletableFuture<?>... cfs)
    
    • 1

    anyOf返回的CompletableFuture是多个任务只要其中一个执行完成就会执行,其get返回的是已经执行完成的任

    务的执行结果,如果该任务执行异常,则抛出异常。

    package com;
    
    import java.util.Random;
    import java.util.concurrent.CompletableFuture;
    import java.util.concurrent.ExecutionException;
    import java.util.concurrent.TimeUnit;
    
    public class AnyOfTest {
    
        public static void main(String[] args) {
            Random random = new Random();
            CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> {
                try {
                    TimeUnit.SECONDS.sleep(random.nextInt(5));
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                return "hello";
            });
    
            CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> {
                try {
                    TimeUnit.SECONDS.sleep(random.nextInt(1));
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                return "world";
            });
            CompletableFuture<Object> result = CompletableFuture.anyOf(future1, future2);
            try {
                Object object = result.get();
                System.out.println(object);
            } catch (InterruptedException e) {
                e.printStackTrace();
            } catch (ExecutionException e) {
                e.printStackTrace();
            }
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    # 程序输出
    world
    
    • 1
    • 2
    package com.test;
    
    import java.util.concurrent.CompletableFuture;
    import java.util.concurrent.ExecutionException;
    
    public class Test15 {
    
        public static void main(String[] args) throws ExecutionException, InterruptedException {
            test();
        }
    
        public static void test() throws ExecutionException, InterruptedException {
            // 创建异步执行任务:
            CompletableFuture<Double> cf = CompletableFuture.supplyAsync(() -> {
                System.out.println(Thread.currentThread() + " start job1,time->" + System.currentTimeMillis());
                try {
                    Thread.sleep(2000);
                } catch (InterruptedException e) {
                }
                System.out.println(Thread.currentThread() + " exit job1,time->" + System.currentTimeMillis());
                return 1.2;
            });
            CompletableFuture<Double> cf2 = CompletableFuture.supplyAsync(() -> {
                System.out.println(Thread.currentThread() + " start job2,time->" + System.currentTimeMillis());
                try {
                    Thread.sleep(1500);
                } catch (InterruptedException e) {
                }
                System.out.println(Thread.currentThread() + " exit job2,time->" + System.currentTimeMillis());
                return 3.2;
            });
            CompletableFuture<Double> cf3 = CompletableFuture.supplyAsync(() -> {
                System.out.println(Thread.currentThread() + " start job3,time->" + System.currentTimeMillis());
                try {
                    Thread.sleep(1300);
                } catch (InterruptedException e) {
                }
                System.out.println(Thread.currentThread() + " exit job3,time->" + System.currentTimeMillis());
                return 2.2;
            });
            //allof等待所有任务执行完成才执行cf4,如果有一个任务异常终止,则cf4.get时会抛出异常,都是正常执行,cf4.get返回null
            //anyOf是只有一个任务执行完成,无论是正常执行或者执行异常,都会执行cf4,cf4.get的结果就是已执行完成的任务的执行结果
            CompletableFuture cf4 = CompletableFuture.anyOf(cf, cf2, cf3).whenComplete((a, b) -> {
                if (b != null) {
                    System.out.println("error stack trace->");
                    b.printStackTrace();
                } else {
                    System.out.println("run succ,result->" + a);
                }
            });
    
            System.out.println("main thread start cf4.get(),time->" + System.currentTimeMillis());
            //等待子任务执行完成
            System.out.println("cf4 run result->" + cf4.get());
            System.out.println("main thread exit,time->" + System.currentTimeMillis());
        }
    
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    # 程序输出
    Thread[ForkJoinPool.commonPool-worker-1,5,main] start job1,time->1693054996820
    Thread[ForkJoinPool.commonPool-worker-2,5,main] start job2,time->1693054996820
    Thread[ForkJoinPool.commonPool-worker-3,5,main] start job3,time->1693054996821
    main thread start cf4.get(),time->1693054996821
    Thread[ForkJoinPool.commonPool-worker-3,5,main] exit job3,time->1693054998122
    run succ,result->2.2
    cf4 run result->2.2
    main thread exit,time->1693054998125
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9

    9.5 allOf

    allOf方法用来实现多 CompletableFuture 的同时返回。

    常用方法:

    public static CompletableFuture<Void> allOf(CompletableFuture<?>... cfs)
    
    • 1

    allOf返回的CompletableFuture是多个任务都执行完成后才会执行,只有有一个任务执行异常,则返回的

    CompletableFuture执行get方法时会抛出异常,如果都是正常执行,则get返回null。

    package com;
    
    import java.util.concurrent.CompletableFuture;
    import java.util.concurrent.ExecutionException;
    import java.util.concurrent.TimeUnit;
    
    public class AllOfTest {
    
        public static void main(String[] args) {
            CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> {
                try {
                    TimeUnit.SECONDS.sleep(2);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println("future1完成!");
                return "future1完成!";
            });
    
            CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> {
                System.out.println("future2完成!");
                return "future2完成!";
            });
    
            CompletableFuture<Void> completableFuture = CompletableFuture.allOf(future1, future2);
            try {
                String result = String.valueOf(completableFuture.get());
                System.out.println(result);
            } catch (InterruptedException e) {
                e.printStackTrace();
            } catch (ExecutionException e) {
                e.printStackTrace();
            }
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    # 程序输出
    future2完成!
    future1完成!
    future1完成!
    future2完成!
    null
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    package com.test;
    
    import java.util.concurrent.CompletableFuture;
    import java.util.concurrent.ExecutionException;
    
    public class Test16 {
    
        public static void main(String[] args) throws ExecutionException, InterruptedException {
            test();
        }
    
        public static void test() throws ExecutionException, InterruptedException {
            // 创建异步执行任务:
            CompletableFuture<Double> cf = CompletableFuture.supplyAsync(() -> {
                System.out.println(Thread.currentThread() + " start job1,time->" + System.currentTimeMillis());
                try {
                    Thread.sleep(2000);
                } catch (InterruptedException e) {
                }
                System.out.println(Thread.currentThread() + " exit job1,time->" + System.currentTimeMillis());
                return 1.2;
            });
            CompletableFuture<Double> cf2 = CompletableFuture.supplyAsync(() -> {
                System.out.println(Thread.currentThread() + " start job2,time->" + System.currentTimeMillis());
                try {
                    Thread.sleep(1500);
                } catch (InterruptedException e) {
                }
                System.out.println(Thread.currentThread() + " exit job2,time->" + System.currentTimeMillis());
                return 3.2;
            });
            CompletableFuture<Double> cf3 = CompletableFuture.supplyAsync(() -> {
                System.out.println(Thread.currentThread() + " start job3,time->" + System.currentTimeMillis());
                try {
                    Thread.sleep(1300);
                } catch (InterruptedException e) {
                }
    //            throw new RuntimeException("test");
                System.out.println(Thread.currentThread() + " exit job3,time->" + System.currentTimeMillis());
                return 2.2;
            });
            //allof等待所有任务执行完成才执行cf4,如果有一个任务异常终止,则cf4.get时会抛出异常,都是正常执行,cf4.get返回null
            //anyOf是只有一个任务执行完成,无论是正常执行或者执行异常,都会执行cf4,cf4.get的结果就是已执行完成的任务的执行结果
            CompletableFuture cf4 = CompletableFuture.allOf(cf, cf2, cf3).whenComplete((a, b) -> {
                if (b != null) {
                    System.out.println("error stack trace->");
                    b.printStackTrace();
                } else {
                    System.out.println("run succ,result->" + a);
                }
            });
    
            System.out.println("main thread start cf4.get(),time->" + System.currentTimeMillis());
            //等待子任务执行完成
            System.out.println("cf4 run result->" + cf4.get());
            System.out.println("main thread exit,time->" + System.currentTimeMillis());
        }
    
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    # 程序输出
    Thread[ForkJoinPool.commonPool-worker-1,5,main] start job1,time->1693055202550
    Thread[ForkJoinPool.commonPool-worker-2,5,main] start job2,time->1693055202550
    Thread[ForkJoinPool.commonPool-worker-3,5,main] start job3,time->1693055202550
    main thread start cf4.get(),time->1693055202551
    Thread[ForkJoinPool.commonPool-worker-3,5,main] exit job3,time->1693055203852
    Thread[ForkJoinPool.commonPool-worker-2,5,main] exit job2,time->1693055204051
    Thread[ForkJoinPool.commonPool-worker-1,5,main] exit job1,time->1693055204551
    run succ,result->null
    cf4 run result->null
    main thread exit,time->1693055204551
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11

    主线程等待最后一个job1执行完成后退出。

    9.6 join

    package com;
    
    import java.util.ArrayList;
    import java.util.List;
    import java.util.concurrent.CompletableFuture;
    
    public class JoinTest {
    
        public static void main(String[] args) throws InterruptedException {
    
            List<Integer> numberList = new ArrayList<>();
            for (int i = 1; i < 11; i++) {
                numberList.add(i);
            }
            List<CompletableFuture<?>> futureList = new ArrayList<>();
            System.out.println("start!");
            long start = System.currentTimeMillis();
            for (Integer number : numberList) {
                futureList.add(CompletableFuture.runAsync(() -> {
                    try {
                        Thread.sleep(1000);
                        System.out.println(number);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }));
            }
            CompletableFuture.allOf(futureList.toArray(new CompletableFuture[futureList.size()])).join();
            long end = System.currentTimeMillis();
            System.out.println("end!");
            System.out.println("耗时:" + (end - start));
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    # 程序输出
    start!
    4
    1
    2
    3
    5
    6
    7
    8
    9
    10
    end!
    耗时:2046
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    package com;
    
    import java.util.concurrent.CompletableFuture;
    
    public class JoinTest2 {
        public static void main(String[] args) {
            CompletableFuture<Void> future1 = CompletableFuture.runAsync(() -> {
                System.out.println("Task 1 started");
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println("Task 1 completed");
            });
    
            CompletableFuture<Void> future2 = CompletableFuture.runAsync(() -> {
                System.out.println("Task 2 started");
                try {
                    Thread.sleep(2000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println("Task 2 completed");
            });
    
            CompletableFuture<Void> future3 = CompletableFuture.runAsync(() -> {
                System.out.println("Task 3 started");
                try {
                    Thread.sleep(3000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println("Task 3 completed");
            });
    
            CompletableFuture<Void> allFutures = CompletableFuture.allOf(future1, future2, future3);
    
            allFutures.thenRun(() -> {
                System.out.println("All tasks completed");
            });
    
            // 防止 JVM 在 CompletableFuture 执行完之前退出
            try {
                Thread.sleep(5000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    # 程序输出
    Task 1 started
    Task 2 started
    Task 3 started
    Task 1 completed
    Task 2 completed
    Task 3 completed
    All tasks completed
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    package com;
    
    import java.util.List;
    import java.util.concurrent.CompletableFuture;
    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.Executors;
    import java.util.stream.Collectors;
    import java.util.stream.IntStream;
    
    public class JoinTest3 {
        public static void main(String[] args) {
            ExecutorService threadPool = Executors.newFixedThreadPool(10);
            List<CompletableFuture<Void>> futures = IntStream.rangeClosed(1, 10).mapToObj(n -> CompletableFuture.runAsync(() -> {
                System.out.println("done " + n);
            }, threadPool)).collect(Collectors.toList());
            futures.forEach(CompletableFuture::join);
            System.out.println("all done");
            threadPool.shutdown();
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    # 程序输出
    done 2
    done 5
    done 4
    done 3
    done 1
    done 6
    done 7
    done 8
    done 9
    done 10
    all done
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12

    9.7 runAfterBoth

    runAfterBoth:组合两个future,不需要获取future的结果,只需两个future处理完任务后,处理该任务。

    package com;
    
    import java.util.concurrent.CompletableFuture;
    
    public class RunAfterBothTest {
    
        public static void main(String[] args) {
    
            CompletableFuture<Integer> future1 = CompletableFuture.supplyAsync(() -> {
                System.out.println("线程1开始了" + Thread.currentThread().getName());
                int i = 100 / 10;
                System.out.println("线程1结束了" + Thread.currentThread().getName());
                return i;
            });
    
            CompletableFuture<Integer> future2 = CompletableFuture.supplyAsync(() -> {
                System.out.println("线程2开始了" + Thread.currentThread().getName());
                int i = 100 / 5;
                System.out.println("线程2结束了" + Thread.currentThread().getName());
                return i;
            });
    
            // 希望在future1 future2任务执行完之后执行future3
            // runAfterBothAsync不能获取前面两个线程的返回结果,本身也没有返回结果
            CompletableFuture<Void> voidCompletableFuture = future1.runAfterBothAsync(future2, () -> {
                System.out.println("线程3执行了");
            });
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    # 程序输出
    线程1开始了ForkJoinPool.commonPool-worker-1
    线程1结束了ForkJoinPool.commonPool-worker-1
    线程2开始了ForkJoinPool.commonPool-worker-2
    线程2结束了ForkJoinPool.commonPool-worker-2
    线程3执行了
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6

    10、CompletableFuture常用方法总结

    在这里插入图片描述

    11、实现最优的烧水泡茶程序

    著名数学家华罗庚先生在《统筹方法》这篇文章里介绍了一个烧水泡茶的例子,文中提到最优的工序应该是下面这

    样:

    在这里插入图片描述

    11.1 基于Future实现

    package com;
    
    import java.util.concurrent.Callable;
    import java.util.concurrent.ExecutionException;
    import java.util.concurrent.FutureTask;
    import java.util.concurrent.TimeUnit;
    
    /**
     * 烧茶案例
     */
    public class FutureTaskTest {
    
        public static void main(String[] args) throws ExecutionException, InterruptedException {
    
            long start = System.currentTimeMillis();
    
            // 创建任务T2的FutureTask
            FutureTask<String> ft2 = new FutureTask<>(new T2Task());
    
            // 创建任务T1的FutureTask
            FutureTask<String> ft1 = new FutureTask<>(new T1Task(ft2));
    
            // 线程T1执行任务ft2
            Thread T1 = new Thread(ft2);
            T1.start();
    
            // 线程T2执行任务ft1
            Thread T2 = new Thread(ft1);
            T2.start();
    
            // 等待线程T1执行结果
            System.out.println(ft1.get());
    
            long end = System.currentTimeMillis();
    
            System.out.println("耗时:" + (end - start));
    
        }
    }
    
    // T1Task需要执行的任务
    // 洗水壶、烧开水、泡茶
    class T1Task implements Callable<String> {
    
        FutureTask<String> ft2;
    
        // T1任务需要T2任务的FutureTask
        T1Task(FutureTask<String> ft2) {
            this.ft2 = ft2;
        }
    
        @Override
        public String call() throws Exception {
            // 洗水壶
            System.out.println("T1:洗水壶...");
            TimeUnit.SECONDS.sleep(1);
            // 烧开水
            System.out.println("T1:烧开水...");
            TimeUnit.SECONDS.sleep(15);
            // 获取T2线程的茶叶
            String tf = ft2.get();
            System.out.println("T1:拿到茶叶:" + tf);
            // 泡茶
            System.out.println("T1:泡茶...");
            return "上茶:" + tf;
        }
    }
    
    // T2Task需要执行的任务
    // 洗茶壶、洗茶杯、拿茶叶
    class T2Task implements Callable<String> {
    
        @Override
        public String call() throws Exception {
            // 洗茶壶
            System.out.println("T2:洗茶壶...");
            TimeUnit.SECONDS.sleep(1);
            // 洗茶杯
            System.out.println("T2:洗茶杯...");
            TimeUnit.SECONDS.sleep(2);
            // 拿茶叶
            System.out.println("T2:拿茶叶...");
            TimeUnit.SECONDS.sleep(1);
            return "龙井";
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    # 程序输出
    T2:洗茶壶...
    T1:洗水壶...
    T2:洗茶杯...
    T1:烧开水...
    T2:拿茶叶...
    T1:拿到茶叶:龙井
    T1:泡茶...
    上茶:龙井
    耗时:16003
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10

    11.2 基于CompletableFuture实现

    package com;
    
    import java.util.concurrent.CompletableFuture;
    import java.util.concurrent.TimeUnit;
    
    public class CompletableFutureTest {
    
        public static void main(String[] args) {
            long start = System.currentTimeMillis();
            // 任务1:洗水壶->烧开水
            CompletableFuture<Void> f1 = CompletableFuture.runAsync(() -> {
                // 洗水壶
                System.out.println("T1:洗水壶...");
                sleep(1, TimeUnit.SECONDS);
                // 烧开水
                System.out.println("T1:烧开水...");
                sleep(15, TimeUnit.SECONDS);
            });
    
            // 任务2:洗茶壶->洗茶杯->拿茶叶
            CompletableFuture<String> f2 = CompletableFuture.supplyAsync(() -> {
                // 洗茶壶
                System.out.println("T2:洗茶壶...");
                sleep(1, TimeUnit.SECONDS);
                // 洗茶杯
                System.out.println("T2:洗茶杯...");
                sleep(2, TimeUnit.SECONDS);
                // 拿茶叶
                System.out.println("T2:拿茶叶...");
                sleep(1, TimeUnit.SECONDS);
                return "龙井";
            });
    
            // 任务3:任务1和任务2完成后执行:泡茶
            CompletableFuture<String> f3 = f1.thenCombine(f2, (a, b) -> {
                System.out.println("T1:拿到茶叶:" + b);
                System.out.println("T1:泡茶...");
                return "上茶:" + b;
            });
    
            //等待任务3执行结果
            System.out.println(f3.join());
    
            long end = System.currentTimeMillis();
    
            System.out.println("耗时:" + (end - start));
        }
    
        static void sleep(int t, TimeUnit u) {
            try {
                u.sleep(t);
            } catch (InterruptedException e) {
            }
        }
    
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56

    结果:

    # 程序输出
    T1:洗水壶...
    T2:洗茶壶...
    T1:烧开水...
    T2:洗茶杯...
    T2:拿茶叶...
    T1:拿到茶叶:龙井
    T1:泡茶...
    上茶:龙井
    耗时:16055
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
  • 相关阅读:
    JMeter处理接口签名sign
    充气膜结构的应用领域
    做题杂记222
    FFmpeg开发笔记(一)搭建Linux系统的开发环境
    x86_64 ubuntu22.04 源码编译WebKit-7615.3.12.11.3
    怎样查看kafka写数据送到topic是否成功
    Kafka (七) --------- Kafka-Eagle 监控
    基于FFmpeg+SDL的视频播放器的制作
    无序数组计算排序后最大相邻差
    openGauss学习笔记-126 openGauss 数据库管理-设置账本数据库-归档账本数据库
  • 原文地址:https://blog.csdn.net/qq_30614345/article/details/132521183