• JUC-Future、CompletionService、CompletableFuture


    并发任务执行,取结果归集

    1. Future

    1.1. 代码示例

    public class FutureTest {
    
        public static void main(String[] args) throws ExecutionException, InterruptedException {
    
            StopWatch watch = new StopWatch();
            watch.start();
    
            //线程池
            ThreadPoolExecutor executor = new ThreadPoolExecutor(3, 3, 60, TimeUnit.SECONDS, new ArrayBlockingQueue<>(10)) ;
    
            //结果集
            List<String> list = new ArrayList<>();
            List<Future<String>> listFuture = new ArrayList<>();
    
            for (int i = 0; i < 3; i++){
                int finalI = i;
                listFuture.add(executor.submit(() -> {
                    if (finalI == 0){
                        Thread.sleep(5000);
                    }else if(finalI == 1){
                        Thread.sleep(3000);
                    }else if(finalI ==2){
                        Thread.sleep(1000);
                    }
                    System.out.println(finalI);
                    return finalI+"";
                }));
            }
    
            //结果归集(结果处理)
            for (int i = 0; i < listFuture.size(); i++){
                Future<String> future = listFuture.get(i);
                while (true){
                    if (future.isDone()){
                        String s = future.get();
                        Thread.sleep(500);
                        list.add(s);
                        System.out.println("获取结果:"+s);
                        break;
                    }else{
                        //System.out.println("等待");
                        //Thread.sleep(1000);
                    }
                }
    
            }
            System.out.println("list="+list);
            watch.stop();
            System.out.println(watch.prettyPrint());
    
        }
    
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53

    1.2. 运行结果

    在这里插入图片描述

    1.3. 结果分析

    返回的结果是[0, 1, 2],即使后边的任务先执行完,也要等之前加入的任务执行完才能拿到结果。如果对结果的处理也是一个非常费时的操作,那么在等待前面的任务执行完才能处理,这等待的时间就浪费掉了。

    2. CompletionService

    CompletionService是JDK8才有的,相对于之前版本的Future而言,CompletionService的优势是能后尽可能快的得到执行完成的任务。所以使用CompletionService能够降低总执行时间。

    2.1. 代码示例

    public class CompletionServiceTest {
    
        public static void main(String[] args) throws InterruptedException, ExecutionException {
    
            StopWatch watch = new StopWatch();
            watch.start();
    
            //线程池
            ThreadPoolExecutor executor = new ThreadPoolExecutor(3, 3, 60, TimeUnit.SECONDS, new ArrayBlockingQueue<>(10)) ;
    
            CompletionService<String> completionService = new ExecutorCompletionService(executor);
    
            //结果集
            List<String> list = new ArrayList<>();
            List<Future<String>> listFuture = new ArrayList<>();
    
            for (int i = 0; i < 3; i++){
                int finalI = i;
                listFuture.add(completionService.submit(() -> {
                    if (finalI == 0){
                        Thread.sleep(5000);
                    }else if(finalI == 1){
                        Thread.sleep(3000);
                    }else if(finalI == 2){
                        Thread.sleep(1000);
                    }
                    System.out.println(finalI);
                    return finalI+"";
                }));
            }
    
    
            //结果归集(结果处理)
            for (int i = 0; i < 3; i++){
                String s = completionService.take().get();
                Thread.sleep(500);
                list.add(s);
                System.out.println("获取结果:"+s);
            }
    
            System.out.println("list="+list);
    
            watch.stop();
            System.out.println(watch.prettyPrint());
        }
    
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47

    2.2. 运行结果

    在这里插入图片描述

    2.3. 结果分析

    返回的结果是[2, 1, 0],可以清楚的看出谁先完成任务就先处理谁的结果。花费时间比Future短。

    2.4. 扩展

    CompletionService还适用于N选1的场景,例如同时从不同的渠道获取数据,当返回任何一个可用的结果即可。这种场景Future是实现不了的。

    2.5. CompletionService接口实现

    在这里插入图片描述

    3. CompletableFuture——异步编程

    3.1. CompletableFuture类图

    在这里插入图片描述

    3.2. 创建CompletableFuture对象

    //使用默认线程池
    static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier)
    
    //使用默认线程池
    static CompletableFuture<Void> runAsync(Runnable runnable)
    
    //可以指定线程池
    static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier,Executor executor)
    
    //可以指定线程池
    static CompletableFuture<Void> runAsync(Runnable runnable,Executor executor)                                          
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11

    根据不同的业务类型创建不同的线程池,以避免互相干扰

    3.3. 实现的Future和CompletionStage接口

    实现异步编程:串行关系、并行关系、汇聚关系。

    3.3.1. Future接口

    用来解决异步操作什么时候结束;以及获取异步操作的结果。

    3.3.2. CompletionStage接口

    3.3.2.1. 串行关系

    CompletionStage<R> thenApply(fn);
    CompletionStage<R> thenApplyAsync(fn);
    CompletionStage<Void> thenAccept(consumer);
    CompletionStage<Void> thenAcceptAsync(consumer);
    CompletionStage<Void> thenRun(action);
    CompletionStage<Void> thenRunAsync(action);
    CompletionStage<R> thenCompose(fn);
    CompletionStage<R> thenComposeAsync(fn);
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8

    3.3.2.2. 汇聚关系AND

    CompletionStage<R> thenCombine(other, fn);
    CompletionStage<R> thenCombineAsync(other, fn);
    CompletionStage<Void> thenAcceptBoth(other, consumer);
    CompletionStage<Void> thenAcceptBothAsync(other, consumer);
    CompletionStage<Void> runAfterBoth(other, action);
    CompletionStage<Void> runAfterBothAsync(other, action);
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6

    3.3.2.3. 汇聚关系OR

    CompletionStage applyToEither(other, fn);
    CompletionStage applyToEitherAsync(other, fn);
    CompletionStage acceptEither(other, consumer);
    CompletionStage acceptEitherAsync(other, consumer);
    CompletionStage runAfterEither(other, action);
    CompletionStage runAfterEitherAsync(other, action);
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6

    3.3.2.4. 异常描述

    CompletionStage exceptionally(fn);
    CompletionStage<R> whenComplete(consumer);
    CompletionStage<R> whenCompleteAsync(consumer);
    CompletionStage<R> handle(fn);
    CompletionStage<R> handleAsync(fn);
    
    • 1
    • 2
    • 3
    • 4
    • 5
  • 相关阅读:
    【数据结构-哈希表 一】【原地哈希】:缺失的第一个正整数
    istio系列:第五章-ServiceEntry内到外的通讯配置
    Python学习笔记之分支结构与循环结构
    nodejs在pdf中绘制表格
    《LeetCode力扣练习》代码随想录——字符串(KMP算法学习补充——针对next数组构建的回退步骤进行解释)
    [C/C++]数据结构 深入挖掘环形链表问题
    GEAR框架: Tractian的敏捷工程文化
    【UE】简单的半透明+描边效果
    加油站监控ai智能分析
    消防大数据平台建设解决方案
  • 原文地址:https://blog.csdn.net/hdn_kb/article/details/126002912