• Reactor 之 多任务并发执行,结果按顺序返回第一个


    1 场景

    调用多个平级服务,按照服务优先级返回第一个有效数据。

    具体case:一个页面可能有很多的弹窗,弹窗之间又有优先级。每次只需要返回第一个有数据的弹窗。但是又希望所有弹窗之间的数据获取是异步的。这种场景使用 Reactor 怎么实现呢?

    2 创建 service

    2.1 创建基本接口和实体类

    public interface TestServiceI {
    	Mono request();
    }
    
    • 1
    • 2
    • 3

    提供一个 request 方法,返回一个 Mono 对象。

    @Data
    @ToString
    @AllArgsConstructor
    @NoArgsConstructor
    public class TestUser {
    	private String name;
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7

    2.2 创建 service 实现

    @Slf4j
    public class TestServiceImpl1 implements TestServiceI {
    	@Override
    	public Mono request() {
    		log.info("execute.test.service1");
    		return Mono.fromSupplier(() -> {
    					try {
    						System.out.println("service1.threadName=" + Thread.currentThread().getName());
    						Thread.sleep(500);
    					} catch (InterruptedException e) {
    						throw new RuntimeException(e);
    					}
    					return "";
    				})
    				.map(name -> {
    					return new TestUser(name);
    				});
    	}
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19

    第一个 service 执行耗时 500ms。返回空对象;

    创建第二个 service 执行耗时 1000ms。返回空对象;代码如上,改一下sleep时间即可。

    继续创建第三个 service 执行耗时 1000ms。返回 name3。代码如上,改一下 sleep 时间,以及返回为 name3。

    3 主体方法

    public static void main(String[] args) {
    		long startTime = System.currentTimeMillis();
    		TestServiceI testServiceImpl4 = new TestServiceImpl4();
    		TestServiceI testServiceImpl5 = new TestServiceImpl5();
    		TestServiceI testServiceImpl6 = new TestServiceImpl6();
    		List<TestServiceI> serviceIList = new ArrayList<>();
    		serviceIList.add(testServiceImpl4);
    		serviceIList.add(testServiceImpl5);
    		serviceIList.add(testServiceImpl6);
    
        // 执行 service 列表,这样有多少个 service 都可以
    		Flux<Mono<TestUser>> monoFlux = Flux.fromIterable(serviceIList)
    				.map(service -> {
    					return service.request();
    				});
    
        // flatMap(或者flatMapSequential) + map 实现异常继续下一个执行
    		Flux flux = monoFlux.flatMapSequential(mono -> {
    			return mono.map(user -> {
    						TestUser testUser = JsonUtil.parseJson(JsonUtil.toJson(user), TestUser.class);
    						if (Objects.nonNull(testUser) && StringUtils.isNotBlank(testUser.getName())) {
    							return testUser;
    						}
                // null 在 reactor 中是异常数据。
    						return null;
    					})
    					.onErrorContinue((err, i) -> {
    						log.info("onErrorContinue={}", i);
    					});
    		});
    		Mono mono = flux.elementAt(0, Mono.just(""));
    		Object block = mono.block();
    		System.out.println(block + "blockFirst 执行耗时ms:" + (System.currentTimeMillis() - startTime));
    	}
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34

    1、Flux.fromIterable 执行 service 列表,可以随意增删 service 服务。

    2、flatMap(或者flatMapSequential) + map + onErrorContinue 实现异常继续下一个执行。具体参考:

    Reactor 之 onErrorContinue 和 onErrorResume

    3、Mono mono = flux.elementAt(0, Mono.just(“”)); 返回第一个正常数据。

    执行输出:

    20:54:26.512 [main] DEBUG reactor.util.Loggers - Using Slf4j logging framework
    20:54:26.553 [main] INFO com.geniu.reactor.TestServiceImpl1 - execute.test.service1
    service1.threadName=main
    20:54:27.237 [main] INFO com.geniu.reactor.TestReactorOrderV2 - onErrorContinue=TestUser(name=)
    20:54:27.237 [main] INFO com.geniu.reactor.TestServiceImpl2 - execute.test.service2
    service5.threadName=main
    20:54:28.246 [main] INFO com.geniu.reactor.TestReactorOrderV2 - onErrorContinue=TestUser(name=)
    20:54:28.246 [main] INFO com.geniu.reactor.TestServiceImpl3 - execute.test.service3
    service6.threadName=main
    TestUser(name=name3)blockFirst 执行耗时ms:2895
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10

    1、service1 和 service2 因为返回空,所以继续下一个,最终返回 name3。

    2、查看总耗时:2895ms。service1 耗时 500,service2 耗时1000,service3 耗时 1000。发现耗时基本上等于 service1 + service2 + service3 。这是怎么回事呢?查看返回执行的线程,都是 main。

    总结:这样实现按照顺序返回第一个正常数据。但是执行并没有异步。下一步:如何实现异步呢?

    4 实现异步

    4.1 subcribeOn 实现异步

    修改 service 实现。增加 .subscribeOn(Schedulers.boundedElastic())

    如下:

    @Slf4j
    public class TestServiceImpl1 implements TestServiceI {
    	@Override
    	public Mono request() {
    		log.info("execute.test.service1");
    		return Mono.fromSupplier(() -> {
    					try {
    						System.out.println("service1.threadName=" + Thread.currentThread().getName());
    						Thread.sleep(500);
    					} catch (InterruptedException e) {
    						throw new RuntimeException(e);
    					}
    					return "";
    				})
    				//增加subscribeOn
    				.subscribeOn(Schedulers.boundedElastic())
    				.map(name -> {
    					return new TestUser(name);
    				});
    	}
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21

    再次执行输出如下:

    21:02:04.213 [main] DEBUG reactor.util.Loggers - Using Slf4j logging framework
    21:02:04.265 [main] INFO com.geniu.reactor.TestServiceImpl1 - execute.test.service1
    service4.threadName=boundedElastic-1
    21:02:04.300 [main] INFO com.geniu.reactor.TestServiceImpl2 - execute.test.service2
    21:02:04.302 [main] INFO com.geniu.reactor.TestServiceImpl3 - execute.test.service3
    service2.threadName=boundedElastic-2
    service3.threadName=boundedElastic-3
    21:02:04.987 [boundedElastic-1] INFO com.geniu.reactor.TestReactorOrderV2 - onErrorContinue=TestUser(name=)
    21:02:05.307 [boundedElastic-2] INFO com.geniu.reactor.TestReactorOrderV2 - onErrorContinue=TestUser(name=)
    TestUser(name=name6)blockFirst 执行耗时ms:1242
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10

    1、发现具体实现 sleep 的线程都不是 main 线程,而是 boundedElastic

    2、最终执行耗时 1242ms,只比执行时间最长的 service2 和 service3 耗时 1000ms,多一些。证明是异步了。

    4.2 CompletableFuture 实现异步

    修改 service 实现,使用 CompletableFuture 执行耗时操作(这里是sleep,具体到项目中可能是外部接口调用,DB 操作等);然后使用 Mono.fromFuture 返回 Mono 对象。

    @Slf4j
    public class TestServiceImpl1 implements TestServiceI{
    	@Override
    	public Mono request() {
    		log.info("execute.test.service1");
    		CompletableFuture<String> uCompletableFuture = CompletableFuture.supplyAsync(() -> {
    			try {
    				System.out.println("service1.threadName=" + Thread.currentThread().getName());
    				Thread.sleep(500);
    			} catch (InterruptedException e) {
    				throw new RuntimeException(e);
    			}
    			return "testname1";
    		});
    
    		return Mono.fromFuture(uCompletableFuture).map(name -> {
    			return new TestUser(name);
    		});
    	}
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20

    执行返回如下:

    21:09:59.465 [main] DEBUG reactor.util.Loggers - Using Slf4j logging framework
    21:09:59.510 [main] INFO com.geniu.reactor.TestServiceImpl2 - execute.test.service2
    service2.threadName=ForkJoinPool.commonPool-worker-1
    21:09:59.526 [main] INFO com.geniu.reactor.TestServiceImpl3 - execute.test.service3
    service3.threadName=ForkJoinPool.commonPool-worker-2
    21:09:59.526 [main] INFO com.geniu.reactor.TestServiceImpl1 - execute.test.service1 
    service1.threadName=ForkJoinPool.commonPool-worker-3
    21:10:00.526 [ForkJoinPool.commonPool-worker-1] INFO com.geniu.reactor.TestReactorOrder - onErrorContinue=TestUser(name=)
    21:10:00.538 [ForkJoinPool.commonPool-worker-2] INFO com.geniu.reactor.TestReactorOrder - onErrorContinue=TestUser(name=)
    TestUser(name=testname1)blockFirst 执行耗时ms:1238
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10

    1、耗时操作都是使用 ForkJoinPool 线程池中的线程执行。

    2、最终耗时和方法1基本差不多。

    大家都去试试吧~

    相关链接:

    Reactor 之 onErrorContinue 和 onErrorResume

    Reactor 之 flatMap vs map 详解

  • 相关阅读:
    FCOS论文复现:通用物体检测算法
    java学习笔记.md版本
    app小程序手机端Python爬虫实战13-fiddler如何抓取手机端数据包
    2.X版本的一个通病问题
    互联网之所以无法成为金融科技的唯一,是有关键原因的
    乐歌智能升降桌、乐歌智能健身椅,为精英生活助力
    第二届“移动云杯”大赛行业赛道(行业应用创新子赛道)赛题密码,请速速转发!...
    Webpack & 理解 input & output 概念
    系统性能调优:提升服务器响应速度
    Renderbus瑞云渲染现在支持3dsMax 2024了
  • 原文地址:https://blog.csdn.net/Prepared/article/details/126510132