• 如何使用CompletableFuture


    目录

    一、CompletableFuture是什么

    二、CompletableFuture用法

    2.1、创建CompletableFuture

    2.1.1、直接创建

    2.1.2、创建一个使用指定数据作为结果的已结束的CompletableFuture

     2.1.3、通过执行异步任务获取CompletableFuture

     2.2、获取任务结果

    2.3、消费结果

    2.3.1、whenComplete

    2.3.2、thenAccept

    2.3.3、thenApply

    2.3.4、thenRun

    2.3.5、handle

    2.3.6、exceptionally

    2.4、多任务编排

    2.4.1、 依赖(thenCompose)

    2.4.2、 AND(thenCombine,thenAcceptBoth,runAfterBoth)

    2.4.3、OR(applyToEither)

    2.4.4、 并行(allOf,anyOf)


    一、CompletableFuture是什么

    CompletableFuture是对Future的扩展和增强。CompletableFuture实现了Future接口和CompletionStage使其实现了对任务编排的能力,支持其在完成时触发相关功能或操作。借助这项能力,可以轻松地组织不同任务的运行顺序、规则以及方式。

    二、CompletableFuture用法

    2.1、创建CompletableFuture

    2.1.1、直接创建

    CompletableFuture completableFuture=new CompletableFuture();

    2.1.2、创建一个使用指定数据作为结果的已结束的CompletableFuture

    CompletableFuture<String> test1 = CompletableFuture.completedFuture("test");

     2.1.3、通过执行异步任务获取CompletableFuture

    1. /**
    2. * 使用默认线程池执行异步任务,有返回值
    3. */
    4. CompletableFuture.supplyAsync(() -> "hello CompletableFuture!");
    5. /**
    6. * 使用指定线程池执行异步任务,有返回值
    7. */
    8. CompletableFuture.supplyAsync(() -> "Hello CompletableFuture!", Executors.newCachedThreadPool());
    9. /**
    10. * 使用默认线程池执行异步任务,无返回值
    11. */
    12. CompletableFuture.runAsync(() -> System.out.println("Hello runAsync!"));
    13. /**
    14. * 使用指定线程池执行异步任务,无返回值
    15. */
    16. CompletableFuture.runAsync(() -> System.out.println("Hello RunAsync!"), Executors.newCachedThreadPool());

     2.2、获取任务结果

    方法是否阻塞是否抛出检查异常说明
    get()阻塞抛出检查异常
    getNow(V value)不阻塞不抛出如果任务没有结束就返回指定的默认值
    get(long timeout, TimeUnit unit)阻塞指定时间抛出
    join()阻塞不抛出

    示例代码: 

    1. /**
    2. * 获取任务结果,阻塞直到任务结束,会抛出检查异常
    3. */
    4. String test = CompletableFuture.supplyAsync(() -> "Hello").get();
    5. /**
    6. * 获取任务结果,如果超过等待之间任务未结束则抛出TimeoutException
    7. */
    8. test = CompletableFuture.supplyAsync(() -> "test").get(10, TimeUnit.MILLISECONDS);
    9. /**
    10. * 如果任务结束则返回任务结果,如果任务未结束则返回指定的默认值
    11. */
    12. test = CompletableFuture.supplyAsync(() -> "test").getNow("default");
    13. /**
    14. * 阻塞获取任务结果,和get相似,但是不会抛出检查异常
    15. */
    16. test = CompletableFuture.supplyAsync(() -> "join").join();

    2.3、消费结果

    2.3.1、whenComplete

    whenComplete是当某个任务执行完成后执行的回调方法,不管任务异常还是正常结束都会执行该回调;该回调的入参是任务的执行结果和异常;
    如果是正常执行则异常为null,回调方法对应的CompletableFuture的result和该任务一致,如果该任务正常执行,则get方法返回执行结果,如果是执行异常,则get方法抛出异常。

    方法说明
     whenComplete(BiConsumer action )在当前线程中同步执行回调操作
     whenCompleteAsync(BiConsumer action )在默认线程池中异步执行回调
     whenCompleteAsync(BiConsumer action,Executor executor )在指定线程池中异步执行回调

     示例代码如下:

    1. CompletableFuture.supplyAsync(() -> {
    2. System.out.println("threadId: " + Thread.currentThread().getId() + ",threadName: " + Thread.currentThread().getName());
    3. return "whenComplete";
    4. }).whenComplete((a, e) ->
    5. System.out.println("threadId: " + Thread.currentThread().getId() + ",threadName: " + Thread.currentThread().getName() + ", a: " + a)
    6. );
    7. CompletableFuture.supplyAsync(()->{throw new RuntimeException("");}).whenComplete((a,e)->System.out.println(e));

    执行结果如下:

    1. threadId: 11,threadName: ForkJoinPool.commonPool-worker-9
    2. threadId: 1,threadName: main, a: whenComplete
    3. java.util.concurrent.CompletionException: java.lang.RuntimeException:

    2.3.2、thenAccept

    thenAccept只消费正常处理结束的结果,不消费异常同时没有返回值

    方法说明
     thenAccept(Consumer action )在当前线程中消费同步消费
    thenAcceptAsync(Consumer action )在默认线程池中异步消费
    thenAcceptAsync(Consumer action,
        Executor executor )
    在指定线程池中异步消费
    1. CompletableFuture.supplyAsync(() -> "thenAccept")
    2. .thenAccept(str -> System.out.println("threadId: " + Thread.currentThread().getId() + ",threadName: " + Thread.currentThread().getName()+",str="+str));
    3. CompletableFuture.supplyAsync(() -> "thenAcceptAsync")
    4. .thenAcceptAsync(str -> System.out.println("threadId: " + Thread.currentThread().getId() + ",threadName: " + Thread.currentThread().getName()+",str="+str));
    5. CompletableFuture.supplyAsync(()->"thenAcceptAsync")
    6. .thenAcceptAsync(str-> System.out.println("threadId: " + Thread.currentThread().getId() + ",threadName: " + Thread.currentThread().getName()+",str="+str),Executors.newSingleThreadExecutor());

    执行结果如下:

    1. threadId: 1,threadName: main,str=thenAccept
    2. threadId: 11,threadName: ForkJoinPool.commonPool-worker-9,str=thenAcceptAsync
    3. threadId: 16,threadName: pool-6-thread-1,str=thenAcceptAsync

    2.3.3、thenApply

    thenApply和thenAccept的相同点都是消费任务正常结束的结果,不同点就是thenAccept没有返回值而thenApply有返回值。

    1. CompletableFuture.supplyAsync(() -> "Dora")
    2. .thenApply(str ->"Hello, "+str).thenAccept(System.out::println);
    3. CompletableFuture.supplyAsync(() -> "Dora")
    4. .thenApplyAsync(str ->"Hello, "+str).thenAccept(System.out::println);
    5. CompletableFuture.supplyAsync(()->"Dora")
    6. .thenApplyAsync(str ->"Hello, "+str).thenAccept(System.out::println);

    执行结果如下:

    Hello, Dora
    Hello, Dora
    Hello, Dora 

    2.3.4、thenRun

    thenRun和thenAccept相似,都是在任务正常结束后执行且都没有返回值,两者的区别是thenAccept关心上一个任务的结果而thenRun不关心结果。

    1. CompletableFuture.supplyAsync(() -> "thenRun")
    2. .thenRun(() -> System.out.println("threadId: " + Thread.currentThread().getId() + ",threadName: " + Thread.currentThread().getName()));
    3. CompletableFuture.supplyAsync(() -> "thenRunAsync")
    4. .thenRunAsync(() -> System.out.println("threadId: " + Thread.currentThread().getId() + ",threadName: " + Thread.currentThread().getName()));
    5. CompletableFuture.supplyAsync(()->"thenRunAsync")
    6. .thenRunAsync(()-> System.out.println("threadId: " + Thread.currentThread().getId() + ",threadName: " + Thread.currentThread().getName()),Executors.newSingleThreadExecutor());

    2.3.5、handle

        handle方法的使用和whenComplete类似

    handlewhenComplete
    入参任务的结果和异常任务的结果和异常
    触发阶段任务结束后,不管正常结束还是异常结束任务结束后,不管正常结束还是异常结束
    返回值有返回值Void

    2.3.6、exceptionally

    exceptionally是当任务执行出现异常时的回调处理,否则不会触发该回调;可以使用whenComplete和handle方法替代该方法

    1. //如果任务执行异常则返回"exception"
    2. CompletableFuture.supplyAsync(()->{throw new RuntimeException("");}).exceptionally((e->"exception")).whenComplete((a,e)->System.out.println(a+" "+e));

    执行结果如下:

    exception null

    2.4、多任务编排

    2.4.1、 依赖(thenCompose)

     thenCompose 是依赖前一个任务结果,前一个任务的结果作为当前任务的入参

    CompletableFuture.supplyAsync(() -> "hello").thenCompose(p -> CompletableFuture.supplyAsync(() -> p + " world")).thenAccept(System.out::println);

    2.4.2、 AND(thenCombine,thenAcceptBoth,runAfterBoth)

    thenCombine 是将两个任务的结果进行合并处理

    1. String test = CompletableFuture.supplyAsync(() -> {
    2. try {
    3. TimeUnit.SECONDS.sleep(1);
    4. } catch (InterruptedException e) {
    5. throw new RuntimeException(e);
    6. }
    7. return "a";
    8. }).thenCombineAsync(CompletableFuture.supplyAsync(() -> {
    9. try {
    10. TimeUnit.SECONDS.sleep(2);
    11. } catch (Exception e) {
    12. }
    13. return "b";
    14. }), (a, b) -> a + "_" + b).join();

    结果是:

    a_b 

    thenAcceptBoth和thenCombine比较类似,区别是thenCombine是有返回值,而thenAcceptBoth是消费前两个任务的结果没有返回值

    1. CompletableFuture.supplyAsync(() -> "hello").thenAcceptBoth(CompletableFuture.supplyAsync(() -> "world"), (a, b) -> {
    2. System.out.println(a + "—" + b);
    3. });

     结果是:

    a-b

    runAfterBoth和thenCombine、thenAcceptBoth有所不同,它不关心前两个任务的结果,只要前两个任务都结束就会触发第三个任务执行

    1. CompletableFuture.supplyAsync(() -> "hello").runAfterBoth(CompletableFuture.supplyAsync(() -> "world"), () -> {
    2. System.out.println("end");
    3. });

    结果是

    end

    2.4.3、OR(applyToEither)

    applyToEither从名字上就可以知道两个任务是或的关系,只要有一个任务结束就出发第三个任务执行。

    1. CompletableFuture.supplyAsync(() -> "first").applyToEither(CompletableFuture.supplyAsync(() -> "second"), str ->
    2. str + " end").thenAccept(System.out::println);

    结果是:

    first end 

    acceptEither是只要有一个任务结束第三个任务就消费它

    CompletableFuture.supplyAsync(() -> "first").acceptEither(CompletableFuture.supplyAsync(() -> "second"), str ->System.out.println(str+" end"));

    2.4.4、 并行(allOf,anyOf)

              allOf 是所有任务都必须执行结束,anyOf是有一个任务正常结束即可。
     

    CompletableFuture.allOf(CompletableFuture.supplyAsync(()->"a"),CompletableFuture.supplyAsync(()->"b")).join();

    三、实战示例

    3.1、RocketMQ中的代码示例

     3.1.1、thenCombine的使用        

            在发送消息过程中同步刷盘场景中需要将消息写入到磁盘中,同时还需要在集群内同步,只有这两个操作都成功才表示消息发送成功!以下代码就是相关处理逻辑:

    1. private CompletableFuture handleDiskFlushAndHA(PutMessageResult putMessageResult,
    2. MessageExt messageExt, int needAckNums, boolean needHandleHA) {
    3. //刷盘
    4. CompletableFuture flushResultFuture = handleDiskFlush(putMessageResult.getAppendMessageResult(), messageExt);
    5. CompletableFuture replicaResultFuture;
    6. //同步
    7. if (!needHandleHA) {
    8. replicaResultFuture = CompletableFuture.completedFuture(PutMessageStatus.PUT_OK);
    9. } else {
    10. replicaResultFuture = handleHA(putMessageResult.getAppendMessageResult(), putMessageResult, needAckNums);
    11. }
    12. //刷盘和同步任务的结果作为第三个任务的输入,构建新的响应
    13. return flushResultFuture.thenCombine(replicaResultFuture, (flushStatus, replicaStatus) -> {
    14. if (flushStatus != PutMessageStatus.PUT_OK) {
    15. putMessageResult.setPutMessageStatus(flushStatus);
    16. }
    17. if (replicaStatus != PutMessageStatus.PUT_OK) {
    18. putMessageResult.setPutMessageStatus(replicaStatus);
    19. }
    20. return putMessageResult;
    21. });
    22. }

    3.1.2、thenAcceptAsync的使用

             使用thenAcceptAsync完成异步发送成功后触发出发任务执行

    1. if (brokerController.getBrokerConfig().isAsyncSendEnable()) {
    2. CompletableFuture asyncPutMessageFuture;
    3. if (sendTransactionPrepareMessage) {
    4. asyncPutMessageFuture = this.brokerController.getTransactionalMessageService().asyncPrepareMessage(msgInner);
    5. } else {
    6. asyncPutMessageFuture = this.brokerController.getMessageStore().asyncPutMessage(msgInner);
    7. }
    8. final int finalQueueIdInt = queueIdInt;
    9. final MessageExtBrokerInner finalMsgInner = msgInner;
    10. //异步写消息成功后触发
    11. asyncPutMessageFuture.thenAcceptAsync(putMessageResult -> {
    12. RemotingCommand responseFuture =
    13. handlePutMessageResult(putMessageResult, response, request, finalMsgInner, responseHeader, sendMessageContext,
    14. ctx, finalQueueIdInt, beginTimeMillis, mappingContext, BrokerMetricsManager.getMessageType(requestHeader));
    15. if (responseFuture != null) {
    16. doResponse(ctx, request, responseFuture);
    17. }
    18. sendMessageCallback.onComplete(sendMessageContext, response);
    19. }, this.brokerController.getPutMessageFutureExecutor());
    20. // Returns null to release the send message thread
    21. return null;
    22. }

  • 相关阅读:
    Vue框架(二)------quasar简介及初始化
    java毕业设计大学生数字云平台2021Mybatis+系统+数据库+调试部署
    java编程基础总结——24.Map接口及其实现子类
    《Python+Kivy(App开发)从入门到实践》自学笔记:高级UX部件——Spinner选择框
    [附源码]计算机毕业设计springboot病人跟踪治疗信息管理系统
    WIFI 万[néng]钥匙 v5.0.10/v4.9.80 SVIP版!
    手把手搭建Vue3+Vite项目模板
    (数据科学学习手札156)地图可视化神器kepler.gl 3.0版本发布
    SpringBoot3分库分表
    [python刷题模板] 0-1BFS
  • 原文地址:https://blog.csdn.net/zhangwei_david/article/details/127873400