Reactor 之 多任务并发执行,结果按顺序返回第一个

1 场景

调用多个平级服务,按照服务优先级返回第一个有效数据。

具体case:一个页面可能有很多的弹窗,弹窗之间又有优先级。每次只需要返回第一个有数据的弹窗。但是又希望所有弹窗之间的数据获取是异步的。这种场景使用 Reactor 怎么实现呢?

2 创建 service

2.1 创建基本接口和实体类

public interface TestServiceI {
	Mono request();
}

提供一个 request 方法,返回一个 Mono 对象。

@Data
@ToString
@AllArgsConstructor
@NoArgsConstructor
public class TestUser {
	private String name;
}

2.2 创建 service 实现

@Slf4j
public class TestServiceImpl1 implements TestServiceI {
	@Override
	public Mono request() {
		log.info("execute.test.service1");
		return Mono.fromSupplier(() -> {
					try {
						System.out.println("service1.threadName=" + Thread.currentThread().getName());
						Thread.sleep(500);
					} catch (InterruptedException e) {
						throw new RuntimeException(e);
					}
					return "";
				})
				.map(name -> {
					return new TestUser(name);
				});
	}
}

第一个 service 执行耗时 500ms。返回空对象;

创建第二个 service 执行耗时 1000ms。返回空对象;代码如上,改一下sleep时间即可。

继续创建第三个 service 执行耗时 1000ms。返回 name3。代码如上,改一下 sleep 时间,以及返回为 name3。

3 主体方法

public static void main(String[] args) {
		long startTime = System.currentTimeMillis();
		TestServiceI testServiceImpl4 = new TestServiceImpl4();
		TestServiceI testServiceImpl5 = new TestServiceImpl5();
		TestServiceI testServiceImpl6 = new TestServiceImpl6();
		List<TestServiceI> serviceIList = new ArrayList<>();
		serviceIList.add(testServiceImpl4);
		serviceIList.add(testServiceImpl5);
		serviceIList.add(testServiceImpl6);

    // 执行 service 列表,这样有多少个 service 都可以
		Flux<Mono<TestUser>> monoFlux = Flux.fromIterable(serviceIList)
				.map(service -> {
					return service.request();
				});

    // flatMap(或者flatMapSequential) + map 实现异常继续下一个执行
		Flux flux = monoFlux.flatMapSequential(mono -> {
			return mono.map(user -> {
						TestUser testUser = JsonUtil.parseJson(JsonUtil.toJson(user), TestUser.class);
						if (Objects.nonNull(testUser) && StringUtils.isNotBlank(testUser.getName())) {
							return testUser;
						}
            // null 在 reactor 中是异常数据。
						return null;
					})
					.onErrorContinue((err, i) -> {
						log.info("onErrorContinue={}", i);
					});
		});
		Mono mono = flux.elementAt(0, Mono.just(""));
		Object block = mono.block();
		System.out.println(block + "blockFirst 执行耗时ms:" + (System.currentTimeMillis() - startTime));
	}

1、Flux.fromIterable 执行 service 列表,可以随意增删 service 服务。

2、flatMap(或者flatMapSequential) + map + onErrorContinue 实现异常继续下一个执行。具体参考:

Reactor 之 onErrorContinue 和 onErrorResume

3、Mono mono = flux.elementAt(0, Mono.just(“”)); 返回第一个正常数据。

执行输出:

20:54:26.512 [main] DEBUG reactor.util.Loggers - Using Slf4j logging framework
20:54:26.553 [main] INFO com.geniu.reactor.TestServiceImpl1 - execute.test.service1
service1.threadName=main
20:54:27.237 [main] INFO com.geniu.reactor.TestReactorOrderV2 - onErrorContinue=TestUser(name=)
20:54:27.237 [main] INFO com.geniu.reactor.TestServiceImpl2 - execute.test.service2
service5.threadName=main
20:54:28.246 [main] INFO com.geniu.reactor.TestReactorOrderV2 - onErrorContinue=TestUser(name=)
20:54:28.246 [main] INFO com.geniu.reactor.TestServiceImpl3 - execute.test.service3
service6.threadName=main
TestUser(name=name3)blockFirst 执行耗时ms:2895

1、service1 和 service2 因为返回空,所以继续下一个,最终返回 name3。

2、查看总耗时:2895ms。service1 耗时 500,service2 耗时1000,service3 耗时 1000。发现耗时基本上等于 service1 + service2 + service3 。这是怎么回事呢?查看返回执行的线程,都是 main。

总结:这样实现按照顺序返回第一个正常数据。但是执行并没有异步。下一步:如何实现异步呢?

4 实现异步

4.1 subcribeOn 实现异步

修改 service 实现。增加 .subscribeOn(Schedulers.boundedElastic())

如下:

@Slf4j
public class TestServiceImpl1 implements TestServiceI {
	@Override
	public Mono request() {
		log.info("execute.test.service1");
		return Mono.fromSupplier(() -> {
					try {
						System.out.println("service1.threadName=" + Thread.currentThread().getName());
						Thread.sleep(500);
					} catch (InterruptedException e) {
						throw new RuntimeException(e);
					}
					return "";
				})
				//增加subscribeOn
				.subscribeOn(Schedulers.boundedElastic())
				.map(name -> {
					return new TestUser(name);
				});
	}
}

再次执行输出如下:

21:02:04.213 [main] DEBUG reactor.util.Loggers - Using Slf4j logging framework
21:02:04.265 [main] INFO com.geniu.reactor.TestServiceImpl1 - execute.test.service1
service4.threadName=boundedElastic-1
21:02:04.300 [main] INFO com.geniu.reactor.TestServiceImpl2 - execute.test.service2
21:02:04.302 [main] INFO com.geniu.reactor.TestServiceImpl3 - execute.test.service3
service2.threadName=boundedElastic-2
service3.threadName=boundedElastic-3
21:02:04.987 [boundedElastic-1] INFO com.geniu.reactor.TestReactorOrderV2 - onErrorContinue=TestUser(name=)
21:02:05.307 [boundedElastic-2] INFO com.geniu.reactor.TestReactorOrderV2 - onErrorContinue=TestUser(name=)
TestUser(name=name6)blockFirst 执行耗时ms:1242

1、发现具体实现 sleep 的线程都不是 main 线程,而是 boundedElastic

2、最终执行耗时 1242ms,只比执行时间最长的 service2 和 service3 耗时 1000ms,多一些。证明是异步了。

4.2 CompletableFuture 实现异步

修改 service 实现,使用 CompletableFuture 执行耗时操作(这里是sleep,具体到项目中可能是外部接口调用,DB 操作等);然后使用 Mono.fromFuture 返回 Mono 对象。

@Slf4j
public class TestServiceImpl1 implements TestServiceI{
	@Override
	public Mono request() {
		log.info("execute.test.service1");
		CompletableFuture<String> uCompletableFuture = CompletableFuture.supplyAsync(() -> {
			try {
				System.out.println("service1.threadName=" + Thread.currentThread().getName());
				Thread.sleep(500);
			} catch (InterruptedException e) {
				throw new RuntimeException(e);
			}
			return "testname1";
		});

		return Mono.fromFuture(uCompletableFuture).map(name -> {
			return new TestUser(name);
		});
	}
}

执行返回如下:

21:09:59.465 [main] DEBUG reactor.util.Loggers - Using Slf4j logging framework
21:09:59.510 [main] INFO com.geniu.reactor.TestServiceImpl2 - execute.test.service2
service2.threadName=ForkJoinPool.commonPool-worker-1
21:09:59.526 [main] INFO com.geniu.reactor.TestServiceImpl3 - execute.test.service3
service3.threadName=ForkJoinPool.commonPool-worker-2
21:09:59.526 [main] INFO com.geniu.reactor.TestServiceImpl1 - execute.test.service1 
service1.threadName=ForkJoinPool.commonPool-worker-3
21:10:00.526 [ForkJoinPool.commonPool-worker-1] INFO com.geniu.reactor.TestReactorOrder - onErrorContinue=TestUser(name=)
21:10:00.538 [ForkJoinPool.commonPool-worker-2] INFO com.geniu.reactor.TestReactorOrder - onErrorContinue=TestUser(name=)
TestUser(name=testname1)blockFirst 执行耗时ms:1238

1、耗时操作都是使用 ForkJoinPool 线程池中的线程执行。

2、最终耗时和方法1基本差不多。

大家都去试试吧~

相关链接:

Reactor 之 onErrorContinue 和 onErrorResume

Reactor 之 flatMap vs map 详解

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。

相关推荐


学习编程是顺着互联网的发展潮流,是一件好事。新手如何学习编程?其实不难,不过在学习编程之前你得先了解你的目的是什么?这个很重要,因为目的决定你的发展方向、决定你的发展速度。
IT行业是什么工作做什么?IT行业的工作有:产品策划类、页面设计类、前端与移动、开发与测试、营销推广类、数据运营类、运营维护类、游戏相关类等,根据不同的分类下面有细分了不同的岗位。
女生学Java好就业吗?女生适合学Java编程吗?目前有不少女生学习Java开发,但要结合自身的情况,先了解自己适不适合去学习Java,不要盲目的选择不适合自己的Java培训班进行学习。只要肯下功夫钻研,多看、多想、多练
Can’t connect to local MySQL server through socket \'/var/lib/mysql/mysql.sock问题 1.进入mysql路径
oracle基本命令 一、登录操作 1.管理员登录 # 管理员登录 sqlplus / as sysdba 2.普通用户登录
一、背景 因为项目中需要通北京网络,所以需要连vpn,但是服务器有时候会断掉,所以写个shell脚本每五分钟去判断是否连接,于是就有下面的shell脚本。
BETWEEN 操作符选取介于两个值之间的数据范围内的值。这些值可以是数值、文本或者日期。
假如你已经使用过苹果开发者中心上架app,你肯定知道在苹果开发者中心的web界面,无法直接提交ipa文件,而是需要使用第三方工具,将ipa文件上传到构建版本,开...
下面的 SQL 语句指定了两个别名,一个是 name 列的别名,一个是 country 列的别名。**提示:**如果列名称包含空格,要求使用双引号或方括号:
在使用H5混合开发的app打包后,需要将ipa文件上传到appstore进行发布,就需要去苹果开发者中心进行发布。​
+----+--------------+---------------------------+-------+---------+
数组的声明并不是声明一个个单独的变量,比如 number0、number1、...、number99,而是声明一个数组变量,比如 numbers,然后使用 nu...
第一步:到appuploader官网下载辅助工具和iCloud驱动,使用前面创建的AppID登录。
如需删除表中的列,请使用下面的语法(请注意,某些数据库系统不允许这种在数据库表中删除列的方式):
前不久在制作win11pe,制作了一版,1.26GB,太大了,不满意,想再裁剪下,发现这次dism mount正常,commit或discard巨慢,以前都很快...
赛门铁克各个版本概览:https://knowledge.broadcom.com/external/article?legacyId=tech163829
实测Python 3.6.6用pip 21.3.1,再高就报错了,Python 3.10.7用pip 22.3.1是可以的
Broadcom Corporation (博通公司,股票代号AVGO)是全球领先的有线和无线通信半导体公司。其产品实现向家庭、 办公室和移动环境以及在这些环境...
发现个问题,server2016上安装了c4d这些版本,低版本的正常显示窗格,但红色圈出的高版本c4d打开后不显示窗格,
TAT:https://cloud.tencent.com/document/product/1340