• 【JUC2022】第一章 CompletableFuture


    【JUC2022】第一章 CompletableFuture

    一、start() 方法

    创建一个 Thread 类,并调用 start() 方法启动线程

    public class ThreadBaseDemo {
        public static void main(String[] args) {
            Thread t1 = new Thread(() -> {},"t1");
            t1.start();
        }
    }
    

    进入到源码,可以发现该方法的底层实现是一个native方法start0(),而在C++中该方法又让JVM执行StartThread命令。我们知道,JVM本质是一个C++程序,也就是说最终是一个C++程序调用了操作系统的指令,启动了一个线程

    二、CompletableFuture

    学习该模块,有助于精进异步编程

    1.Future接口

    package java.util.concurrent;
    在这里插入图片描述

    Future 接口定义了操作异步任务的一些方法,如获取异步任务的执行结果、取消任务的执行、判断任务是否被取消、判断任务执行是否完毕等

    比如主线程创建了一个子线程去执行任务,并且子线程可能比较耗时,主线程在启动子线程后就去做其他事情了。等到主线程过了一会做完其他事情后,才去获取子任务的执行结果,或者变更任务的状态

    总结成一句话:Future接口可以为主线程创建一个子线程,去处理一些费时费力的复杂业务

    2.FutureTask

    实现Runable接口可以定义一个线程类,实现Callable接口不仅可以定义一个线程类,还可以获取返回值

    实现RunnableFuture接口可以定义一个线程类,还具有Future实现类的身份

    FutureTask类就是一个Runnable接口的实现类,并且可以通过构造注入的方式,获取到实现了Callable接口的线程,如下图所示
    在这里插入图片描述
    下面是FutureTask的基本用法

    package com.sisyphus.CompletableFuture;
    
    import java.util.concurrent.Callable;
    import java.util.concurrent.ExecutionException;
    import java.util.concurrent.FutureTask;
    
    public class ThreadBaseDemo {
        public static void main(String[] args) {
            FutureTask<String> futureTask = new FutureTask<>(new MyThread());
    
            Thread t1 = new Thread(futureTask, "t1");
            t1.start();
            try {
                System.out.println(futureTask.get());
            } catch (InterruptedException | ExecutionException e) {
                e.printStackTrace();
            }
        }
    }
    
    class MyThread implements Callable{
    
        @Override
        public Object call() throws Exception {
            System.out.println("-----come in call()");
            return "hello Callable";
        }
    }
    

    Future+线程池解决异步多线程任务,能够显著提高程序的执行效率,测试Demo如下

    package com.sisyphus.CompletableFuture;
    
    import java.util.concurrent.ExecutionException;
    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.Executors;
    import java.util.concurrent.FutureTask;
    import java.util.concurrent.TimeUnit;
    
    public class FutureThreadPoolDemo {
        public static void main(String[] args) {
            //3个任务,开启多个异步任务线程,请问耗时多少?
    
            long startTime = System.currentTimeMillis();
    
            ExecutorService threadPool = Executors.newFixedThreadPool(3);
    
            FutureTask<String> futureTask1 = new FutureTask<String>(() -> {
                try { TimeUnit.MILLISECONDS.sleep(500); } catch (InterruptedException e) { e.printStackTrace(); }
                return "task1 over";
            } );
            threadPool.submit(futureTask1);
            try {
                System.out.println(futureTask1.get());
                System.out.println(futureTask1.get());
            } catch (InterruptedException | ExecutionException e) {
                e.printStackTrace();
            }
    
            FutureTask<String> futureTask2 = new FutureTask<String>(() -> {
                try { TimeUnit.MILLISECONDS.sleep(500); } catch (InterruptedException e) { e.printStackTrace(); }
                return "task2 over";
            } );
            threadPool.submit(futureTask2);
    
            try { TimeUnit.MILLISECONDS.sleep(500); } catch (InterruptedException e) { e.printStackTrace(); }
    
            threadPool.shutdown();
    
            long endTime = System.currentTimeMillis();
            System.out.println("-----costTime" + (endTime - startTime) + "ms");
    
            System.out.println(Thread.currentThread().getName()+"\t -----end");
        }
    
        private static void m1(){
            //3个任务,目前只有一个线程main来处理,请问耗时多少?
            long startTime = System.currentTimeMillis();
    
            //暂停毫秒
            try { TimeUnit.MILLISECONDS.sleep(500); } catch (InterruptedException e) { e.printStackTrace(); }
            try { TimeUnit.MILLISECONDS.sleep(500); } catch (InterruptedException e) { e.printStackTrace(); }
            try { TimeUnit.MILLISECONDS.sleep(500); } catch (InterruptedException e) { e.printStackTrace(); }
    
            long endTime = System.currentTimeMillis();
            System.out.println("-----costTime" + (endTime - startTime) + "ms");
    
            System.out.println(Thread.currentThread().getName()+"\t -----end");
        }
    }
    

    【缺点1】Futuretask的get()方法容易导致阻塞,如下所示

    package com.sisyphus.CompletableFuture;
    
    import java.util.concurrent.ExecutionException;
    import java.util.concurrent.FutureTask;
    import java.util.concurrent.TimeUnit;
    
    public class FutureAPIDemo {
        public static void main(String[] args) throws ExecutionException, InterruptedException {
            FutureTask<String> futureTask = new FutureTask<String>(() -> {
                System.out.println(Thread.currentThread().getName() + "\t -----come in");
                //暂停几秒钟线程
                try { TimeUnit.SECONDS.sleep(5); } catch (InterruptedException e) { e.printStackTrace(); }
                return "task over";
            });
            Thread t1 = new Thread(futureTask, "t1");
            t1.start();
            //如果get()在主线程忙其他任务前,并且futureTask并没有完成,那么将会导致主线程在此阻塞,无法进行其他任务
            System.out.println(futureTask.get());
    
            System.out.println(Thread.currentThread().getName() + "\t -----忙其他任务了");
    
    		//想要避免这个问题,需要将get()方法放在后面
    		//System.out.println(futureTask.get());
        }
    }
    

    根本原因在于get()方法的逻辑是:只要调用了get()方法,那么就一定要等到futureTask完成

    如果我们希望get()方法人性化一点,如果futureTask没有完成,我们可以稍微等待一会儿,如果还是没有完成,我们可以不用阻塞,直接离开,那么我们应该怎么做呢?

    方法1:我们可以使用get()的重载方法get(long timeout, TimeUnit unit),超时将抛出异常,catch后再处理,虽然但是,还是不够优雅

    方法2:使用idDone()方法判断futureTask是否完成,如果想要异步获取结果,通常采用轮询的方式,而轮询的方式会导致CPU空转,而且也不见得能够及时地得到futureTask的结果

    while(true){
        if (futureTask.isDone()){
            System.out.println(futureTask.get());
            break;
        }else{
            try { TimeUnit.SECONDS.sleep(5); } catch (InterruptedException e) { e.printStackTrace(); }
        }
    }
    

    3.CompletableFuture类

    从Java8开始引入了CompletableFuture,它是Future的功能增强版,减少阻塞和轮询可以传入回调对象,当异步任务完成或者发生异常时,自动调用回调对象的回调方法

    CompletableFuture提供了一种观察者模式类似的机制,可以让任务执行完成后通知监听的一方

    CompletableFuture同时实现了Future接口和CompletionStage接口

    CompletionStage接口

    • CompletionStage代表异步计算过程中的某一个阶段,一个阶段完成以后可能会触发另一个阶段,有些类似Linux系统的管道分隔符传参数
    • 一个阶段的计算执行可以是一个Function、Consumer或者Runnable。比如:stage.thenApply(x -> square(x)).thenAccept(x -> System.out.print(x)).thenRun(() -> System.out.println())
    • 一个阶段的执行可能是被单个阶段的完成触发,也可能是由多个阶段一起触发

    核心静态方法

    1.runAsync
    无返回值

    public static CompletableFuture<Void> runAsync(Runnable runnable)
    public static CompletableFuture<Void> runAsync(Runnable runnable, Executor executor)
    

    2.supply
    有返回值

    public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier)
    public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier, Executor executor)
    

    *供给型接口Supplier

    package java.util.function;
    
    /**
     * Represents a supplier of results.
     *  
     * There is no requirement that a new or disinct result be returned each
     * time the supplier is invoked.
     */
    @FunctionalInterface
    public interface Supplier<T>{
    	/**
    	 * Gets a result.
    	 */
    	T get();
    }
    
    import java.util.function.Supplier;
    
    public class Test{
    	public static void main(String args[]){
    		Supplier<String> s = () -> {
    			return "供给型接口";
    		};
    		System.out.println(s.get());
    	}
    }
    

    *Executor
    如果没有指定Executor,将直接使用默认的ForkJoinPool.commonPool()作为它的线程池去执行异步代码;
    如果指定线程池,则使用我们指定的线程池执行异步代码

    【案例一】无返回值

    package com.sisyphus.CompletableFuture;
    
    import java.util.concurrent.CompletableFuture;
    import java.util.concurrent.ExecutionException;
    import java.util.concurrent.TimeUnit;
    
    public class CompletableFutureBuildDemo {
        public static void main(String[] args) throws ExecutionException, InterruptedException {
            CompletableFuture<Void> completableFuture = CompletableFuture.runAsync(() -> {
                System.out.println(Thread.currentThread().getName());
                //暂停几秒钟线程
                try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); }
            });
    
            System.out.println(completableFuture.get());
        }
    }
    

    【案例二】指定线程池

    package com.sisyphus.CompletableFuture;
    
    import java.util.concurrent.CompletableFuture;
    import java.util.concurrent.ExecutionException;
    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.Executors;
    import java.util.concurrent.TimeUnit;
    
    public class CompletableFutureBuildDemo {
        public static void main(String[] args) throws ExecutionException, InterruptedException {
            ExecutorService threadPool = Executors.newFixedThreadPool(3);
    
            CompletableFuture<Void> completableFuture = CompletableFuture.runAsync(() -> {
                System.out.println(Thread.currentThread().getName());
                //暂停几秒钟线程
                try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); }
            },threadPool);
    
            System.out.println(completableFuture.get());
    
    		threadPool.shutdown();
        }
    }
    

    【案例三】有返回值

    package com.sisyphus.CompletableFuture;
    
    import java.util.concurrent.CompletableFuture;
    import java.util.concurrent.ExecutionException;
    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.Executors;
    import java.util.concurrent.TimeUnit;
    
    public class CompletableFutureBuildDemo {
        public static void main(String[] args) throws ExecutionException, InterruptedException {
            ExecutorService threadPool = Executors.newFixedThreadPool(3);
    
            /*CompletableFuture completableFuture = CompletableFuture.runAsync(() -> {
                System.out.println(Thread.currentThread().getName());
                //暂停几秒钟线程
                try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); }
            },threadPool);
    
            System.out.println(completableFuture.get());*/
    
            CompletableFuture<String> completableFuture = CompletableFuture.supplyAsync(() -> {
                System.out.println(Thread.currentThread().getName());
                //暂停几秒钟线程
                try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); }
                return "hello";
            },threadPool);
    
            System.out.println(completableFuture.get());
    
            threadPool.shutdown();
        }
    }
    

    【案例四】替代Future接口

    package com.sisyphus.CompletableFuture;
    
    import java.util.concurrent.CompletableFuture;
    import java.util.concurrent.ExecutionException;
    import java.util.concurrent.ThreadLocalRandom;
    import java.util.concurrent.TimeUnit;
    
    public class CompletableFutureUseDemo {
        public static void main(String[] args) throws ExecutionException, InterruptedException {
            CompletableFuture<Integer> completableFuture = CompletableFuture.supplyAsync(() -> {
                System.out.println(Thread.currentThread().getName() + "-----come in");
                int result = ThreadLocalRandom.current().nextInt(10);
                try {
                    TimeUnit.SECONDS.sleep(1);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println("-----1秒钟后出结果:" + result);
                return result;
            });
    
            System.out.println(Thread.currentThread().getName() + "线程先去忙其他任务");
    
            System.out.println(completableFuture.get());
        }
    }
    

    【案例五】非阻塞get

    package com.sisyphus.CompletableFuture;
    
    import java.util.concurrent.CompletableFuture;
    import java.util.concurrent.ExecutionException;
    import java.util.concurrent.ThreadLocalRandom;
    import java.util.concurrent.TimeUnit;
    
    public class CompletableFutureUseDemo {
        public static void main(String[] args) throws ExecutionException, InterruptedException {
            CompletableFuture<Integer> completableFuture = CompletableFuture.supplyAsync(() -> {
                System.out.println(Thread.currentThread().getName() + "-----come in");
                int result = ThreadLocalRandom.current().nextInt(10);
                try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); }
                System.out.println("-----1秒钟后出结果:" + result);
                return result;
            }).whenComplete((v,e) ->{
                if(e == null){
                    System.out.println("-----计算完成,更新系统UpdateValue:" + v);
                }
            }).exceptionally(e -> {
                e.printStackTrace();
                System.out.println("异常情况:" + e.getCause() + "\t" + e.getMessage());
                return null;
            });
    
            System.out.println(Thread.currentThread().getName() + "线程先去忙其他任务");
    
            //主线程不要立刻结束,否则CompletableFuture默认使用的线程池会立刻关闭,可以使用自定义线程池避免
            try { TimeUnit.SECONDS.sleep(3); } catch (InterruptedException e) { e.printStackTrace(); }
        }
    
        private static void future1() throws ExecutionException, InterruptedException {
            CompletableFuture<Integer> completableFuture = CompletableFuture.supplyAsync(() -> {
                System.out.println(Thread.currentThread().getName() + "-----come in");
                int result = ThreadLocalRandom.current().nextInt(10);
                try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); }
                System.out.println("-----1秒钟后出结果:" + result);
                return result;
            });
    
            System.out.println(Thread.currentThread().getName() + "线程先去忙其他任务");
    
            System.out.println(completableFuture.get());
        }
    }
    

    【案例六】非阻塞get(自定义线程池)

    package com.sisyphus.CompletableFuture;
    
    import java.util.concurrent.CompletableFuture;
    import java.util.concurrent.ExecutionException;
    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.Executors;
    import java.util.concurrent.ThreadLocalRandom;
    import java.util.concurrent.TimeUnit;
    
    public class CompletableFutureUseDemo {
        public static void main(String[] args) throws ExecutionException, InterruptedException {
    
            ExecutorService threadPool = Executors.newFixedThreadPool(3);
    
            try{
                CompletableFuture<Integer> completableFuture = CompletableFuture.supplyAsync(() -> {
                    System.out.println(Thread.currentThread().getName() + "-----come in");
                    int result = ThreadLocalRandom.current().nextInt(10);
                    try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); }
                    System.out.println("-----1秒钟后出结果:" + result);
                    return result;
                },threadPool).whenComplete((v,e) ->{
                    if(e == null){
                        System.out.println("-----计算完成,更新系统UpdateValue:" + v);
                    }
                }).exceptionally(e -> {
                    e.printStackTrace();
                    System.out.println("异常情况:" + e.getCause() + "\t" + e.getMessage());
                    return null;
                });
    
                System.out.println(Thread.currentThread().getName() + "线程先去忙其他任务");
            }catch (Exception e){
                e.printStackTrace();
            }finally {
                threadPool.shutdown();
            }
        }
    
        private static void future1() throws ExecutionException, InterruptedException {
            CompletableFuture<Integer> completableFuture = CompletableFuture.supplyAsync(() -> {
                System.out.println(Thread.currentThread().getName() + "-----come in");
                int result = ThreadLocalRandom.current().nextInt(10);
                try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); }
                System.out.println("-----1秒钟后出结果:" + result);
                return result;
            });
    
            System.out.println(Thread.currentThread().getName() + "线程先去忙其他任务");
    
            System.out.println(completableFuture.get());
        }
    }
    

    CompletableFuture的优点

    • 异步任务结束后,会自动回调某个对象的方法
    • 主线程设置好回调后,不再关心异步任务的执行,异步任务之间可以顺序执行
    • 异步任务出错时,会自动回调某个对象的方法

    4.电商网站比价需求案例

    函数式接口回顾

    函数式接口名称方法名称参数返回值
    Runnablerun无参数无返回值
    Functionapply1个参数有返回值
    Consumeaccept1个参数无返回值
    Supplierget没有参数有返回值
    BiConsumeraccept2个参数无返回值

    join方法

    package com.sisyphus.CompletableFuture;
    
    import java.util.concurrent.CompletableFuture;
    
    public class CompletableFutureMallDemo {
        public static void main(String[] args) {
            CompletableFuture<String> completableFuture = CompletableFuture.supplyAsync(() -> {
                return "hello";
            });
    
            //System.out.println(completableFuture.get());
            System.out.println(completableFuture.join());
        }
    }
    

    join的功能与get几乎一样,只是没有对异常进行处理时,join不会报检查型异常

    【需求说明】

    1. 同一款产品,同时搜索出同款产品在各大电商平台的售价
    2. 同一款产品,同时搜索出本产品在同一个电商平台下,各个入驻卖家售价是多少

    【输出返回】
    输出的结果希望是同款产品在不同地方的价格清单列表,返回一个List

    《Java入门到入土》 in JD price is 88.05
    《Java入门到入土》 in DangDang is 86.11
    《Java入门到入土》 in Taobao is 90.43
    

    【解决方案】

    1. step by step:按部就班,查完京东查当当,查完当当查淘宝
    2. all in:一口气多线程异步任务同时查询

    【技术要求】

    1. 函数式编程
    2. 链式编程
    3. Stream流式计算

    【代码实现】

    package com.sisyphus.CompletableFuture;
    
    import lombok.Getter;
    
    import java.util.Arrays;
    import java.util.List;
    import java.util.concurrent.CompletableFuture;
    import java.util.concurrent.ThreadLocalRandom;
    import java.util.concurrent.TimeUnit;
    import java.util.stream.Collectors;
    
    public class CompletableFutureMallDemo {
    
        static List<NetMall> list = Arrays.asList(
                new NetMall("JD"),
                new NetMall("DangDang"),
                new NetMall("Taobao")
        );
    
        /**
         * @Description: 一家一家查询
         * List -----> map -----> List
         * @data:[list, productName]
         * @return: java.util.List
         * @Author: sisyphus
         * @Date: 2022-09-29 17:21:45
         */
        public static List<String> getPrice(List<NetMall> list, String productName){
            return list.
                    stream()
                    .map(netMall ->
                            String.format(productName + " in %s price is %.2f",
                                    netMall.getNetMallName(),
                                    netMall.calPrice(productName)))
                    .collect(Collectors.toList());
        }
    
        /**
         * @Description:
         * List -----> List> -----> List
         * @data:[list, productName]
         * @return: java.util.List
         * @Author: sisyphus
         * @Date: 2022-09-29 18:04:45
         */
        public static List<String> getPriceByCompletableFuture(List<NetMall> list, String productName){
            return list
                    .stream()
                    .map(netMall ->
                            CompletableFuture.supplyAsync(() ->
                                    String.format(productName + " in %s price is %.2f",
                                    netMall.getNetMallName(),
                                    netMall.calPrice(productName))))
                    .collect(Collectors.toList())
                    .stream()
                    .map(s ->
                            s.join())
                    .collect(Collectors.toList());
        }
    
        public static void main(String[] args) {
            long startTime1 = System.currentTimeMillis();
            List<String> list1 = getPrice(list, "《Java从入门到入土》");
            for (String element : list1){
                System.out.println(element);
            }
            long endTime1 = System.currentTimeMillis();
            System.out.println("-----costTime:" + (endTime1 - startTime1) + "毫秒");
    
            System.out.println("----------");
    
            long startTime2 = System.currentTimeMillis();
            List<String> list2 = getPriceByCompletableFuture(list, "《Java从入门到入土》");
            for (String element : list1){
                System.out.println(element);
            }
            long endTime2 = System.currentTimeMillis();
            System.out.println("-----costTime:" + (endTime2 - startTime2) + "毫秒");
        }
    }
    
    class NetMall{
        @Getter
        private String netMallName;
    
        public NetMall(String netMallName) {
            this.netMallName = netMallName;
        }
    
        public double calPrice(String productName){
            try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); }
            return ThreadLocalRandom.current().nextDouble() * 2 + productName.charAt(0);
        }
    }
    

    5.CompletableFuture常用方法

    获得结果和触发计算
    【获取结果】

    • public T get():获取不到结果,线程会阻塞
    • public T get(long timeout, TimeUnit unit):设置超时时间,超时就抛异常
    • public T join():与get()类似,只是不需要处理ExecutionException和InterruptedException
    • public T getNow(T valueIfAbsent):如果现在能获取结果,则返回结果,如果不能,则返回valueIfAbsent
    package com.sisyphus.CompletableFuture;
    
    import java.util.concurrent.CompletableFuture;
    import java.util.concurrent.ExecutionException;
    import java.util.concurrent.TimeUnit;
    import java.util.concurrent.TimeoutException;
    
    public class CompletableFutureAPIDemo {
        public static void main(String[] args) throws ExecutionException, InterruptedException {
            CompletableFuture<String> completableFuture = CompletableFuture.supplyAsync(() -> {
                try { TimeUnit.SECONDS.sleep(3); } catch (InterruptedException e) { e.printStackTrace(); }
                return "abc";
            });
    
    //        System.out.println(completableFuture.get());
    //        System.out.println(completableFuture.get(2L, TimeUnit.SECONDS));
    //        System.out.println(completableFuture.join());
            System.out.println(completableFuture.getNow("xxx"));
        }
    }
    

    【主动触发计算】

    • public boolean complete(T value):如果计算未完成则中断计算,并将计算结果设置为value,返回true;如果计算完成,则直接返回false
    package com.sisyphus.CompletableFuture;
    
    import java.util.concurrent.CompletableFuture;
    import java.util.concurrent.ExecutionException;
    import java.util.concurrent.TimeUnit;
    import java.util.concurrent.TimeoutException;
    
    public class CompletableFutureAPIDemo {
        public static void main(String[] args) throws ExecutionException, InterruptedException {
            CompletableFuture<String> completableFuture = CompletableFuture.supplyAsync(() -> {
                try { TimeUnit.SECONDS.sleep(2); } catch (InterruptedException e) { e.printStackTrace(); }
                return "abc";
            });
    
            try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); }
    
    //        System.out.println(completableFuture.get());
    //        System.out.println(completableFuture.get(2L, TimeUnit.SECONDS));
    //        System.out.println(completableFuture.join());
            //System.out.println(completableFuture.getNow("xxx"));
            System.out.println(completableFuture.complete("completeValue") + "\t" + completableFuture.join());
    
        }
    }
    

    对计算结果进行处理

    • thenApply():计算结果存在依赖关系,任务之间是串行的;由于存在依赖关系,如果当前步骤有异常则会终止后续步骤
    • handle():计算结果存在依赖关系,任务之间是串行的;即使出异常了,但是由于可以携带异常参数,不会终止后续步骤
    package com.sisyphus.CompletableFuture;
    
    import java.util.concurrent.CompletableFuture;
    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.Executors;
    import java.util.concurrent.TimeUnit;
    
    public class CompletableFutureAPI2Demo {
        public static void main(String[] args) {
            ExecutorService threadPool = Executors.newFixedThreadPool(3);
    
            CompletableFuture.supplyAsync(() -> {
                try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); }
                System.out.println("111");
                return 1;
            },threadPool).handle((f,e) -> {
                int i = 10/0;
                System.out.println("222");
                return f + 2;
            }).handle((f,e) -> {
                System.out.println("333");
                return f + 3;
            }).whenComplete((v,e) -> {
                if (e == null){
                    System.out.println("-----计算结果:" + v);
                }
            }).exceptionally(e -> {
                e.printStackTrace();
                System.out.println(e.getMessage());
                return null;
            });
    
            System.out.println(Thread.currentThread().getName() + "-----主线程先去忙其他任务");
    
            threadPool.shutdown();
        }
    }
    

    对计算结果进行消费

    • thenAccept():接收任务的处理结果,并消费处理,无返回结果
    package com.sisyphus.CompletableFuture;
    
    import java.util.concurrent.CompletableFuture;
    import java.util.concurrent.TimeUnit;
    
    public class CompletableFutureAPI3Demo {
        public static void main(String[] args) {
            CompletableFuture.supplyAsync(() -> {
                return 1;
            }).thenApply(f -> {
                return f + 2;
            }).thenApply(f -> {
                return f + 3;
            }).thenAccept(r -> {
                System.out.println(r);
            });
    
            try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); }
        }
    }
    

    【任务之间的顺序执行关系】

    1. thenRun(Runnable runnable):任务 A 执行完,执行 B,并且 B 不需要 A 的结果
    2. thenAccept(Consumer action):任务 A 执行完执行 B,B需要 A 的结果,但是任务 B 无返回值
    3. thenApply(Function fn):任务 A 执行完执行 B,B 需要 A 的结果,同时任务 B 有返回值
    package com.sisyphus.CompletableFuture;
    
    import java.util.concurrent.CompletableFuture;
    import java.util.concurrent.TimeUnit;
    
    public class CompletableFutureAPI3Demo {
        public static void main(String[] args) {
    //        CompletableFuture.supplyAsync(() -> {
    //            return 1;
    //        }).thenApply(f -> {
    //            return f + 2;
    //        }).thenApply(f -> {
    //            return f + 3;
    //        }).thenAccept(r -> {
    //            System.out.println(r);
    //        });
    
            CompletableFuture.supplyAsync(() -> "resultA").thenRun(() -> {}).join();
            CompletableFuture.supplyAsync(() -> "resultA").thenAccept(r -> System.out.println(r)).join();
            CompletableFuture.supplyAsync(() -> "resultA").thenApply(r -> r + "resultB").join();
        }
    }
    

    CompletableFuture 和线程池说明

    我们发现,CompletableFuture的thenRun()、thenApply()、thenAcccept()方法都有Async方法,以及参数带线程池的方法,接下来我们将用代码探讨一下它们之间的区别
    在这里插入图片描述
    【thenRun】

    package com.sisyphus.CompletableFuture;
    
    import java.util.concurrent.CompletableFuture;
    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.Executors;
    import java.util.concurrent.TimeUnit;
    
    public class CompletableFutureWithThreadPoolDemo {
        public static void main(String[] args) {
            ExecutorService threadPool = Executors.newFixedThreadPool(5);
    
            try{
                CompletableFuture<Void> completableFuture = CompletableFuture.supplyAsync(() -> {
                    try { TimeUnit.MILLISECONDS.sleep(10); } catch (InterruptedException e) { e.printStackTrace(); }
                    System.out.println("1号任务" + "\t" + Thread.currentThread().getName());
                    return "abcd";
                }).thenRun(() -> {
                    try { TimeUnit.MILLISECONDS.sleep(10); } catch (InterruptedException e) { e.printStackTrace(); }
                    System.out.println("2号任务" + "\t" + Thread.currentThread().getName());
                }).thenRun(() -> {
                    try { TimeUnit.MILLISECONDS.sleep(10); } catch (InterruptedException e) { e.printStackTrace(); }
                    System.out.println("3号任务" + "\t" + Thread.currentThread().getName());
                }).thenRun(() -> {
                    try { TimeUnit.MILLISECONDS.sleep(10); } catch (InterruptedException e) { e.printStackTrace(); }
                    System.out.println("4号任务" + "\t" + Thread.currentThread().getName());
                });
    
                System.out.println(completableFuture.get(2L, TimeUnit.SECONDS));
            }catch (Exception e){
                e.printStackTrace();
            }finally {
                threadPool.shutdown();
            }
        }
    }
    

    所有任务都使用ForkJoin线程池

    【thenRunAsync;指定线程池】

    package com.sisyphus.CompletableFuture;
    
    import java.util.concurrent.CompletableFuture;
    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.Executors;
    import java.util.concurrent.TimeUnit;
    
    public class CompletableFutureWithThreadPoolDemo {
        public static void main(String[] args) {
            ExecutorService threadPool = Executors.newFixedThreadPool(5);
    
            try{
                CompletableFuture<Void> completableFuture = CompletableFuture.supplyAsync(() -> {
                    try { TimeUnit.MILLISECONDS.sleep(10); } catch (InterruptedException e) { e.printStackTrace(); }
                    System.out.println("1号任务" + "\t" + Thread.currentThread().getName());
                    return "abcd";
                },threadPool).thenRunAsync(() -> {
                    try { TimeUnit.MILLISECONDS.sleep(10); } catch (InterruptedException e) { e.printStackTrace(); }
                    System.out.println("2号任务" + "\t" + Thread.currentThread().getName());
                }).thenRun(() -> {
                    try { TimeUnit.MILLISECONDS.sleep(10); } catch (InterruptedException e) { e.printStackTrace(); }
                    System.out.println("3号任务" + "\t" + Thread.currentThread().getName());
                }).thenRun(() -> {
                    try { TimeUnit.MILLISECONDS.sleep(10); } catch (InterruptedException e) { e.printStackTrace(); }
                    System.out.println("4号任务" + "\t" + Thread.currentThread().getName());
                });
    
                System.out.println(completableFuture.get(2L, TimeUnit.SECONDS));
            }catch (Exception e){
                e.printStackTrace();
            }finally {
                threadPool.shutdown();
            }
        }
    }
    

    除了第一个任务使用自定义线程池,其他任务都使用ForkJoin线程池

    【thenRunAsync;指定线程池;取消1号任务等待时间】

    package com.sisyphus.CompletableFuture;
    
    import java.util.concurrent.CompletableFuture;
    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.Executors;
    import java.util.concurrent.TimeUnit;
    
    public class CompletableFutureWithThreadPoolDemo {
        public static void main(String[] args) {
            ExecutorService threadPool = Executors.newFixedThreadPool(5);
    
            try{
                CompletableFuture<Void> completableFuture = CompletableFuture.supplyAsync(() -> {
                    //try { TimeUnit.MILLISECONDS.sleep(10); } catch (InterruptedException e) { e.printStackTrace(); }
                    System.out.println("1号任务" + "\t" + Thread.currentThread().getName());
                    return "abcd";
                },threadPool).thenRun(() -> {
                    try { TimeUnit.MILLISECONDS.sleep(10); } catch (InterruptedException e) { e.printStackTrace(); }
                    System.out.println("2号任务" + "\t" + Thread.currentThread().getName());
                }).thenRun(() -> {
                    try { TimeUnit.MILLISECONDS.sleep(10); } catch (InterruptedException e) { e.printStackTrace(); }
                    System.out.println("3号任务" + "\t" + Thread.currentThread().getName());
                }).thenRun(() -> {
                    try { TimeUnit.MILLISECONDS.sleep(10); } catch (InterruptedException e) { e.printStackTrace(); }
                    System.out.println("4号任务" + "\t" + Thread.currentThread().getName());
                });
    
                System.out.println(completableFuture.get(2L, TimeUnit.SECONDS));
            }catch (Exception e){
                e.printStackTrace();
            }finally {
                threadPool.shutdown();
            }
        }
    }
    

    除了第一个任务使用自定义线程池,其他任务都使用main线程

    总结

    1. 没有传入自定义线程池,都将采用默认线程池ForkJoinPool
    2. 传入了自定义线程池:调用thenRun方法执行第二个任务时,则第二个任务和第一个任务是共用一个线程池;调用thenRunAsync执行第二个任务时,则第一个任务使用的是你自己传入的线程池,第二个任务使用的是ForkJoin线程池
    3. 有可能处理太快,系统优化切换原则,直接使用main线程

    对计算结果进行选用

    谁快就使用谁的结果

    package com.sisyphus.CompletableFuture;
    
    import java.util.concurrent.CompletableFuture;
    import java.util.concurrent.TimeUnit;
    
    public class CompletableFutureFstDemo {
        public static void main(String[] args) {
            CompletableFuture<String> playerA = CompletableFuture.supplyAsync(() -> {
                System.out.println("A come in");
                try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); }
                return "playerA";
            });
    
            CompletableFuture<String> playerB = CompletableFuture.supplyAsync(() -> {
                System.out.println("B come in");
                try { TimeUnit.SECONDS.sleep(3); } catch (InterruptedException e) { e.printStackTrace(); }
                return "playerB";
            });
    
            CompletableFuture<String> result = playerA.applyToEither(playerB,f -> {
                return f + " is winner";
            });
    
            System.out.println(Thread.currentThread().getName() + "\t" + "-----" + result.join());
        }
    }
    

    对计算结果进行合并
    两个CompletionStage任务都完成后,最终把两个任务的结果一起交给thenCombine来处理,先完成的会等待其他分支任务完成

    package com.sisyphus.CompletableFuture;
    
    import java.util.concurrent.CompletableFuture;
    import java.util.concurrent.TimeUnit;
    
    public class CompletableFutureCombineDemo {
        public static void main(String[] args) {
            CompletableFuture<Integer> completableFuture1 = CompletableFuture.supplyAsync(() -> {
                System.out.println(Thread.currentThread().getName() + "\t -----启动");
                try {
                    TimeUnit.SECONDS.sleep(1);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                return 10;
            });
    
            CompletableFuture<Integer> completableFutur2 = CompletableFuture.supplyAsync(() -> {
                System.out.println(Thread.currentThread().getName() + "\t -----启动");
                try {
                    TimeUnit.SECONDS.sleep(2);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                return 20;
            });
            CompletableFuture result = completableFuture1.thenCombine(completableFutur2,(x,y) -> {
                System.out.println("开始两个结果合并");
                return x+y;
            });
    
            System.out.println(result.join());
        }
    }
    
  • 相关阅读:
    @Slf4j打印异常信息打印出堆栈信息
    SpringBoot基于AOP实现RocketMQ发送与消费
    [架构之路-228]:计算机硬件与体系结构 - 硬盘存储结构原理:如何表征0和1,即如何存储0和1,如何读数据,如何写数据(修改数据)
    DBeaver(其他可视化工具一样的逻辑)连接IoTDBDriver教程
    Burp Suite Professional 2024.6 for macOS x64 & ARM64 - 领先的 Web 渗透测试软件
    【Python机器学习】零基础掌握ShrunkCovariance协方差估计
    使用eslint + prettier + husky + lint-staged 约束项目的最佳实践!
    数据的标准化处理——基于python
    动动嘴即可搜视频 抖音与Siri达成合作
    合宙ESP32C3 更换Flash调整固件教程分享
  • 原文地址:https://blog.csdn.net/qq_45593575/article/details/126427720