目录
2.1.2、创建一个使用指定数据作为结果的已结束的CompletableFuture
2.1.3、通过执行异步任务获取CompletableFuture
2.4.2、 AND(thenCombine,thenAcceptBoth,runAfterBoth)
CompletableFuture是对Future的扩展和增强。CompletableFuture实现了Future接口和CompletionStage使其实现了对任务编排的能力,支持其在完成时触发相关功能或操作。借助这项能力,可以轻松地组织不同任务的运行顺序、规则以及方式。

CompletableFuture completableFuture=new CompletableFuture();
CompletableFuture<String> test1 = CompletableFuture.completedFuture("test");
- /**
- * 使用默认线程池执行异步任务,有返回值
- */
- CompletableFuture.supplyAsync(() -> "hello CompletableFuture!");
- /**
- * 使用指定线程池执行异步任务,有返回值
- */
- CompletableFuture.supplyAsync(() -> "Hello CompletableFuture!", Executors.newCachedThreadPool());
- /**
- * 使用默认线程池执行异步任务,无返回值
- */
- CompletableFuture.runAsync(() -> System.out.println("Hello runAsync!"));
- /**
- * 使用指定线程池执行异步任务,无返回值
- */
- CompletableFuture.runAsync(() -> System.out.println("Hello RunAsync!"), Executors.newCachedThreadPool());
| 方法 | 是否阻塞 | 是否抛出检查异常 | 说明 |
| get() | 阻塞 | 抛出检查异常 | |
| getNow(V value) | 不阻塞 | 不抛出 | 如果任务没有结束就返回指定的默认值 |
| get(long timeout, TimeUnit unit) | 阻塞指定时间 | 抛出 | |
| join() | 阻塞 | 不抛出 |
示例代码:
- /**
- * 获取任务结果,阻塞直到任务结束,会抛出检查异常
- */
- String test = CompletableFuture.supplyAsync(() -> "Hello").get();
- /**
- * 获取任务结果,如果超过等待之间任务未结束则抛出TimeoutException
- */
- test = CompletableFuture.supplyAsync(() -> "test").get(10, TimeUnit.MILLISECONDS);
- /**
- * 如果任务结束则返回任务结果,如果任务未结束则返回指定的默认值
- */
- test = CompletableFuture.supplyAsync(() -> "test").getNow("default");
-
- /**
- * 阻塞获取任务结果,和get相似,但是不会抛出检查异常
- */
- test = CompletableFuture.supplyAsync(() -> "join").join();
whenComplete是当某个任务执行完成后执行的回调方法,不管任务异常还是正常结束都会执行该回调;该回调的入参是任务的执行结果和异常;
如果是正常执行则异常为null,回调方法对应的CompletableFuture的result和该任务一致,如果该任务正常执行,则get方法返回执行结果,如果是执行异常,则get方法抛出异常。
| 方法 | 说明 |
| whenComplete(BiConsumer super T, ? super Throwable> action ) | 在当前线程中同步执行回调操作 |
| whenCompleteAsync(BiConsumer super T, ? super Throwable> action ) | 在默认线程池中异步执行回调 |
| whenCompleteAsync(BiConsumer super T, ? super Throwable> action,Executor executor ) | 在指定线程池中异步执行回调 |
示例代码如下:
- CompletableFuture.supplyAsync(() -> {
- System.out.println("threadId: " + Thread.currentThread().getId() + ",threadName: " + Thread.currentThread().getName());
- return "whenComplete";
- }).whenComplete((a, e) ->
- System.out.println("threadId: " + Thread.currentThread().getId() + ",threadName: " + Thread.currentThread().getName() + ", a: " + a)
- );
-
- CompletableFuture.supplyAsync(()->{throw new RuntimeException("");}).whenComplete((a,e)->System.out.println(e));
执行结果如下:
- threadId: 11,threadName: ForkJoinPool.commonPool-worker-9
- threadId: 1,threadName: main, a: whenComplete
- java.util.concurrent.CompletionException: java.lang.RuntimeException:
thenAccept只消费正常处理结束的结果,不消费异常同时没有返回值
| 方法 | 说明 |
| thenAccept(Consumer super T> action ) | 在当前线程中消费同步消费 |
| thenAcceptAsync(Consumer super T> action ) | 在默认线程池中异步消费 |
| thenAcceptAsync(Consumer super T> action, Executor executor ) | 在指定线程池中异步消费 |
- CompletableFuture.supplyAsync(() -> "thenAccept")
- .thenAccept(str -> System.out.println("threadId: " + Thread.currentThread().getId() + ",threadName: " + Thread.currentThread().getName()+",str="+str));
- CompletableFuture.supplyAsync(() -> "thenAcceptAsync")
- .thenAcceptAsync(str -> System.out.println("threadId: " + Thread.currentThread().getId() + ",threadName: " + Thread.currentThread().getName()+",str="+str));
- CompletableFuture.supplyAsync(()->"thenAcceptAsync")
- .thenAcceptAsync(str-> System.out.println("threadId: " + Thread.currentThread().getId() + ",threadName: " + Thread.currentThread().getName()+",str="+str),Executors.newSingleThreadExecutor());
执行结果如下:
- threadId: 1,threadName: main,str=thenAccept
- threadId: 11,threadName: ForkJoinPool.commonPool-worker-9,str=thenAcceptAsync
- threadId: 16,threadName: pool-6-thread-1,str=thenAcceptAsync
thenApply和thenAccept的相同点都是消费任务正常结束的结果,不同点就是thenAccept没有返回值而thenApply有返回值。
- CompletableFuture.supplyAsync(() -> "Dora")
- .thenApply(str ->"Hello, "+str).thenAccept(System.out::println);
- CompletableFuture.supplyAsync(() -> "Dora")
- .thenApplyAsync(str ->"Hello, "+str).thenAccept(System.out::println);
- CompletableFuture.supplyAsync(()->"Dora")
- .thenApplyAsync(str ->"Hello, "+str).thenAccept(System.out::println);
执行结果如下:
Hello, Dora
Hello, Dora
Hello, Dora
thenRun和thenAccept相似,都是在任务正常结束后执行且都没有返回值,两者的区别是thenAccept关心上一个任务的结果而thenRun不关心结果。
- CompletableFuture.supplyAsync(() -> "thenRun")
- .thenRun(() -> System.out.println("threadId: " + Thread.currentThread().getId() + ",threadName: " + Thread.currentThread().getName()));
- CompletableFuture.supplyAsync(() -> "thenRunAsync")
- .thenRunAsync(() -> System.out.println("threadId: " + Thread.currentThread().getId() + ",threadName: " + Thread.currentThread().getName()));
- CompletableFuture.supplyAsync(()->"thenRunAsync")
- .thenRunAsync(()-> System.out.println("threadId: " + Thread.currentThread().getId() + ",threadName: " + Thread.currentThread().getName()),Executors.newSingleThreadExecutor());
handle方法的使用和whenComplete类似
| handle | whenComplete | |
| 入参 | 任务的结果和异常 | 任务的结果和异常 |
| 触发阶段 | 任务结束后,不管正常结束还是异常结束 | 任务结束后,不管正常结束还是异常结束 |
| 返回值 | 有返回值 | Void |
exceptionally是当任务执行出现异常时的回调处理,否则不会触发该回调;可以使用whenComplete和handle方法替代该方法
- //如果任务执行异常则返回"exception"
- CompletableFuture.supplyAsync(()->{throw new RuntimeException("");}).exceptionally((e->"exception")).whenComplete((a,e)->System.out.println(a+" "+e));
执行结果如下:
exception null
thenCompose 是依赖前一个任务结果,前一个任务的结果作为当前任务的入参
CompletableFuture.supplyAsync(() -> "hello").thenCompose(p -> CompletableFuture.supplyAsync(() -> p + " world")).thenAccept(System.out::println);
thenCombine 是将两个任务的结果进行合并处理
- String test = CompletableFuture.supplyAsync(() -> {
-
- try {
- TimeUnit.SECONDS.sleep(1);
- } catch (InterruptedException e) {
- throw new RuntimeException(e);
- }
- return "a";
- }).thenCombineAsync(CompletableFuture.supplyAsync(() -> {
- try {
- TimeUnit.SECONDS.sleep(2);
- } catch (Exception e) {
-
- }
- return "b";
- }), (a, b) -> a + "_" + b).join();
结果是:
a_b
thenAcceptBoth和thenCombine比较类似,区别是thenCombine是有返回值,而thenAcceptBoth是消费前两个任务的结果没有返回值
- CompletableFuture.supplyAsync(() -> "hello").thenAcceptBoth(CompletableFuture.supplyAsync(() -> "world"), (a, b) -> {
- System.out.println(a + "—" + b);
- });
结果是:
a-b
runAfterBoth和thenCombine、thenAcceptBoth有所不同,它不关心前两个任务的结果,只要前两个任务都结束就会触发第三个任务执行
- CompletableFuture.supplyAsync(() -> "hello").runAfterBoth(CompletableFuture.supplyAsync(() -> "world"), () -> {
- System.out.println("end");
- });
结果是
end
applyToEither从名字上就可以知道两个任务是或的关系,只要有一个任务结束就出发第三个任务执行。
- CompletableFuture.supplyAsync(() -> "first").applyToEither(CompletableFuture.supplyAsync(() -> "second"), str ->
- str + " end").thenAccept(System.out::println);
结果是:
first end
acceptEither是只要有一个任务结束第三个任务就消费它
CompletableFuture.supplyAsync(() -> "first").acceptEither(CompletableFuture.supplyAsync(() -> "second"), str ->System.out.println(str+" end"));
allOf 是所有任务都必须执行结束,anyOf是有一个任务正常结束即可。
CompletableFuture.allOf(CompletableFuture.supplyAsync(()->"a"),CompletableFuture.supplyAsync(()->"b")).join();
在发送消息过程中同步刷盘场景中需要将消息写入到磁盘中,同时还需要在集群内同步,只有这两个操作都成功才表示消息发送成功!以下代码就是相关处理逻辑:
- private CompletableFuture
handleDiskFlushAndHA(PutMessageResult putMessageResult, - MessageExt messageExt, int needAckNums, boolean needHandleHA) {
- //刷盘
- CompletableFuture
flushResultFuture = handleDiskFlush(putMessageResult.getAppendMessageResult(), messageExt); - CompletableFuture
replicaResultFuture; - //同步
- if (!needHandleHA) {
- replicaResultFuture = CompletableFuture.completedFuture(PutMessageStatus.PUT_OK);
- } else {
- replicaResultFuture = handleHA(putMessageResult.getAppendMessageResult(), putMessageResult, needAckNums);
- }
- //刷盘和同步任务的结果作为第三个任务的输入,构建新的响应
- return flushResultFuture.thenCombine(replicaResultFuture, (flushStatus, replicaStatus) -> {
- if (flushStatus != PutMessageStatus.PUT_OK) {
- putMessageResult.setPutMessageStatus(flushStatus);
- }
- if (replicaStatus != PutMessageStatus.PUT_OK) {
- putMessageResult.setPutMessageStatus(replicaStatus);
- }
- return putMessageResult;
- });
- }
使用thenAcceptAsync完成异步发送成功后触发出发任务执行
- if (brokerController.getBrokerConfig().isAsyncSendEnable()) {
- CompletableFuture
asyncPutMessageFuture; - if (sendTransactionPrepareMessage) {
- asyncPutMessageFuture = this.brokerController.getTransactionalMessageService().asyncPrepareMessage(msgInner);
- } else {
- asyncPutMessageFuture = this.brokerController.getMessageStore().asyncPutMessage(msgInner);
- }
-
- final int finalQueueIdInt = queueIdInt;
- final MessageExtBrokerInner finalMsgInner = msgInner;
- //异步写消息成功后触发
- asyncPutMessageFuture.thenAcceptAsync(putMessageResult -> {
- RemotingCommand responseFuture =
- handlePutMessageResult(putMessageResult, response, request, finalMsgInner, responseHeader, sendMessageContext,
- ctx, finalQueueIdInt, beginTimeMillis, mappingContext, BrokerMetricsManager.getMessageType(requestHeader));
- if (responseFuture != null) {
- doResponse(ctx, request, responseFuture);
- }
- sendMessageCallback.onComplete(sendMessageContext, response);
- }, this.brokerController.getPutMessageFutureExecutor());
- // Returns null to release the send message thread
- return null;
- }