背景:有需要用到异步编程的,需要响应式回调,不是单线程不好,redis就很棒。根据实际情况选择,这篇文章 介绍了CompletableFuture,Future和RxJava的Observable之间的区别可以看看。
部分不带后缀Async的和带的区别在于是否开启新线程处理结果,根据具体实际情况选用。
只演示有返回值的,没返回值的不掩饰。
supplyAsync创建一个带返回值的异步任务。thenApplyAsync某个任务执行完执行的回调。exceptionally 某个任务执行异常时执行的回调方法。handleAsync某个任务执行完成后执行的回调方法,可以处理异常。thenCombine两个任务都正常执行完才执行后面任务。applyToEither只要其中一个执行完了就会执行某个任务。allOf多个任务都执行完成后才会执行。anyOf谁先执行完就持有谁。supplyAsync示例:runAsync功能类似只是没返回值不推荐
@GetMapping("/supplyAsync")
public String supplyAsync() throws ExecutionException, InterruptedException {
System.out.println(Thread.currentThread().getName());
long l = System.currentTimeMillis();
CompletableFuture<Double> f1 = CompletableFuture.supplyAsync(()->{
System.out.println("supplyAsync--->"+Thread.currentThread().getName());
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
}
return 1.2;
});
//获取结果
System.out.println("最终结果:"+String.valueOf(f1.get()));
System.out.println("耗时:"+(System.currentTimeMillis()-l)/1000);
return "success";
}
输出:
可以看到主线程和子线程在运行,主线程阻塞等待子线程获取结果,不需要结果不调用get()方法即可实现异步。
最终耗时2秒也就是主线程在等待子线程的2秒。
http-nio-8080-exec-1
supplyAsync--->ForkJoinPool.commonPool-worker-1
最终结果:1.2
耗时:2
thenApplyAsync示例可以在任务后面链式使用,thenApply,thenAccept,thenRun 功能类似不推荐,要么没入参要么没返回值
入参是上一个任务的返回值,出参和上一个任务的返回值类型一样。
可以写多个thenApplyAsync来处理后续任务。
@GetMapping("/supplyAsync")
public String supplyAsync() throws ExecutionException, InterruptedException {
System.out.println(Thread.currentThread().getName());
long l = System.currentTimeMillis();
CompletableFuture<Double> f1 = CompletableFuture.supplyAsync(()->{
System.out.println("supplyAsync--->"+Thread.currentThread().getName());
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
}
return 1.2;
}).thenApplyAsync(result->{
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("thenApplyAsync->"+Thread.currentThread().getName());
return result*2;
}).thenApplyAsync(result->{
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("thenApplyAsync->"+Thread.currentThread().getName());
return result*2;
});
//获取结果
System.out.println("最终结果:"+String.valueOf(f1.get()));
System.out.println("耗时:"+(System.currentTimeMillis()-l)/1000);
return "success";
}
输出:
supplyAsync返回值是1.2,经过thenApplyAsync后续处理返回翻倍,最终返回4.8。
thenApplyAsync第一个任务和supplyAsync共用一个线程,第二个新开一个线程这就是后缀加Async作用。
这是一个链路,整体耗时6秒,每一个子线程按照顺序执行,三个线程分别是2秒合计6秒。
http-nio-8080-exec-1
supplyAsync--->ForkJoinPool.commonPool-worker-1
thenApplyAsync->ForkJoinPool.commonPool-worker-1
thenApplyAsync->ForkJoinPool.commonPool-worker-1
最终结果:4.8
耗时:6
exceptionally示例只有前置任务发生异常才会走这里。
当前置第一个任务发生异常后走这里,其他后续任务不走。
返回和前置任务一样类型的数据。
@GetMapping("/supplyAsync")
public String supplyAsync() throws ExecutionException, InterruptedException {
System.out.println(Thread.currentThread().getName());
long l = System.currentTimeMillis();
CompletableFuture<Double> f1 = CompletableFuture.supplyAsync(()->{
System.out.println("supplyAsync--->"+Thread.currentThread().getName());
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
}
int a = 1/0;
return 1.2;
}).thenApplyAsync(result->{
System.out.println("thenApplyAsync--->"+Thread.currentThread().getName());
int a = 1/0;
return result*2;
}).exceptionally(p->{
System.out.println("exceptionally--->"+Thread.currentThread().getName());
return -0.1;
});
//获取结果
System.out.println("最终结果:"+String.valueOf(f1.get()));
System.out.println("耗时:"+(System.currentTimeMillis()-l)/1000);
return "success";
}
输出:
supplyAsync发生异常,跳过thenApplyAsync不执行,直接走exceptionally。
http-nio-8080-exec-1
supplyAsync--->ForkJoinPool.commonPool-worker-1
exceptionally--->ForkJoinPool.commonPool-worker-1
最终结果:-0.1
耗时:2
whenCompleteAsync不演示某个任务执行完成后执行的回调方法,接收2个参数,第一个是上一个任务的返回结果,第二个是异常,没返回值不演示。
handleAsync示例跟whenComplete基本一致,区别在于handleAsync的回调方法有返回值,且返回的结果可以自己定义与上一个任务中的泛型没关系。
thenApplyAsync在这里抛出异常,handleAsync这里处理异常。
@GetMapping("/supplyAsync")
public String supplyAsync() throws ExecutionException, InterruptedException {
System.out.println(Thread.currentThread().getName());
long l = System.currentTimeMillis();
CompletableFuture<Double> f1 = CompletableFuture.supplyAsync(()->{
System.out.println("supplyAsync--->"+Thread.currentThread().getName());
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
}
return 1.2;
}).thenApplyAsync(result->{
System.out.println("thenApplyAsync--->"+Thread.currentThread().getName());
int a = 1/0;
return result*2;
});
CompletableFuture<String> f2 = f1.handleAsync((a, b) -> {
System.out.println("handleAsync--->" + Thread.currentThread().getName());
System.out.println("a--->"+a);
System.out.println("b--->"+b);
return "error";
});
System.out.println("耗时:"+(System.currentTimeMillis()-l)/1000);
//获取结果
System.out.println("最终结果:"+String.valueOf(f2.get()));
return "success";
}
输出:
报错之后,handleAsync获取不到之前的返回值,只能获取到异常信息,handleAsync中的和返回值。
http-nio-8080-exec-1
supplyAsync--->ForkJoinPool.commonPool-worker-1
thenApplyAsync--->ForkJoinPool.commonPool-worker-1
handleAsync--->ForkJoinPool.commonPool-worker-1
a--->null
b--->java.util.concurrent.CompletionException: java.lang.ArithmeticException: / by zero
最终结果:error
耗时:2
thenCombine示例和thenAcceptBoth,runAfterBoth都是将两个CompletableFuture组合起来,只有都正常执行完才执行后面任务。区别在于,thenCombine会将两个任务的执行结果作为方法入参传递到指定方法中,且该方法有返回值;thenAcceptBoth同样将两个任务的执行结果作为方法入参,但是无返回值;runAfterBoth没有入参,也没有返回值。注意两个任务中只要有一个执行异常,则将该异常信息作为指定任务的执行结果。
@GetMapping("/supplyAsync")
public String supplyAsync() throws ExecutionException, InterruptedException {
System.out.println(Thread.currentThread().getName());
long l = System.currentTimeMillis();
CompletableFuture<Double> f1 = CompletableFuture.supplyAsync(()->{
System.out.println("supplyAsync1--->"+Thread.currentThread().getName());
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
}
return 1.2;
});
CompletableFuture<Double> f2 = CompletableFuture.supplyAsync(()->{
System.out.println("supplyAsync2--->"+Thread.currentThread().getName());
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
}
int a = 2/0;
return 1.8;
}).handle((a,b)->{
if(b!=null){
return 2.3;
}else{
return 3.3;
}
});
CompletableFuture<Double> f4 = f1.thenCombineAsync(f2, (a, b) -> {
System.out.println("thenCombineAsync1--->"+Thread.currentThread().getName());
System.out.println("a--->"+a);
System.out.println("b--->"+b);
return a+b;
});
//获取结果
System.out.println("最终结果:"+String.valueOf(f4.get()));
System.out.println("耗时:"+(System.currentTimeMillis()-l)/1000);
return "success";
}
输出:
f2出现异常用handle处理,最终thenCombineAsync拿到返回结果。
耗时2秒,两个子线程一个是1秒一个是2秒,一起执行,最慢完成结束才结束。
http-nio-8080-exec-2
supplyAsync1--->ForkJoinPool.commonPool-worker-1
supplyAsync2--->ForkJoinPool.commonPool-worker-2
thenCombineAsync1--->ForkJoinPool.commonPool-worker-1
a--->1.2
b--->2.3
最终结果:3.5
耗时:2
applyToEither 示例同acceptEither , runAfterEither都是将两个CompletableFuture组合起来,看谁跑得快就拿谁,其区别在于applyToEither有入参,有返回值;acceptEither方法入参,没有返回值;runAfterEither没方法入参,也没返回值。两个任务中只要有一个执行异常,则将该异常信息作为指定任务的执行结果。
@GetMapping("/supplyAsync")
public String supplyAsync() throws ExecutionException, InterruptedException {
System.out.println(Thread.currentThread().getName());
long l = System.currentTimeMillis();
CompletableFuture<Double> f1 = CompletableFuture.supplyAsync(()->{
System.out.println("supplyAsync1--->"+Thread.currentThread().getName());
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
}
return 1.2;
});
CompletableFuture<Double> f2 = CompletableFuture.supplyAsync(()->{
System.out.println("supplyAsync2--->"+Thread.currentThread().getName());
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
}
int a = 2/0;
return 1.8;
}).handle((a,b)->{
if(b!=null){
return 2.3;
}else{
return 3.3;
}
}).applyToEither(f1,a->a);
//获取结果
System.out.println("最终结果:"+String.valueOf(f2.get()));
System.out.println("耗时:"+(System.currentTimeMillis()-l)/1000);
return "success";
}
输出:
f2执行速度快返回2.3
耗时1秒,谁先结束直接结束
http-nio-8080-exec-1
supplyAsync1--->ForkJoinPool.commonPool-worker-1
supplyAsync2--->ForkJoinPool.commonPool-worker-2
最终结果:2.3
耗时:1
allOf示例所有任务都结束才结束
@GetMapping("/supplyAsync")
public String supplyAsync() throws ExecutionException, InterruptedException {
System.out.println(Thread.currentThread().getName());
long l = System.currentTimeMillis();
CompletableFuture<Double> f1 = CompletableFuture.supplyAsync(()->{
System.out.println("supplyAsync1--->"+Thread.currentThread().getName());
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
}
return 1.2;
});
CompletableFuture<Double> f2 = CompletableFuture.supplyAsync(()->{
System.out.println("supplyAsync2--->"+Thread.currentThread().getName());
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
}
return 1.8;
});
CompletableFuture.allOf(f1,f2);
//获取结果
System.out.println("最终结果:"+String.valueOf(f1.get()));
System.out.println("最终结果:"+String.valueOf(f2.get()));
System.out.println("耗时:"+(System.currentTimeMillis()-l)/1000);
return "success";
}
输出:
耗时2秒,等待所有任务完成才结束
http-nio-8080-exec-2
supplyAsync1--->ForkJoinPool.commonPool-worker-1
supplyAsync2--->ForkJoinPool.commonPool-worker-2
最终结果:1.2
最终结果:1.8
耗时:2
anyOf示例返回最先执行完的任务
@GetMapping("/supplyAsync")
public String supplyAsync() throws ExecutionException, InterruptedException {
System.out.println(Thread.currentThread().getName());
long l = System.currentTimeMillis();
CompletableFuture<Double> f1 = CompletableFuture.supplyAsync(()->{
System.out.println("supplyAsync1--->"+Thread.currentThread().getName());
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
}
return 1.2;
});
CompletableFuture<Double> f2 = CompletableFuture.supplyAsync(()->{
System.out.println("supplyAsync2--->"+Thread.currentThread().getName());
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
}
return 1.8;
});
CompletableFuture<Object> f3 = CompletableFuture.anyOf(f1, f2);
//获取结果
System.out.println("最终结果:"+String.valueOf(f3.get()));
System.out.println("耗时:"+(System.currentTimeMillis()-l)/1000);
return "success";
}
输出
耗时1秒优先返回耗时最短的任务
http-nio-8080-exec-1
supplyAsync1--->ForkJoinPool.commonPool-worker-1
supplyAsync2--->ForkJoinPool.commonPool-worker-2
最终结果:1.8
耗时:1