微信公众号搜"智元新知"关注
微信扫一扫可直接关注哦!

使用 Java Reactor 并行运行 cassandra 查询

如何解决使用 Java Reactor 并行运行 cassandra 查询

我在尝试并行运行多个 cassandra 查询时遇到问题(Spring 5、Spring-Data、Cassandra 4、datastax 驱动程序 4.9.0)。

这个示例代码没问题

Mono<String> mono1 = Mono.just("1").log().subscribeOn(Schedulers.boundedElastic());
Mono<String> mono2 = Mono.just("2").log().subscribeOn(Schedulers.boundedElastic());
Mono.zip(Arrays.asList(mono2,mono1),result -> result).block();

日志显示每个单声道都在其线程中执行(boundedElastic-X)

18:46:12.866 [boundedElastic-1] INFO reactor.Mono.Just.2 - | onSubscribe([Synchronous Fuseable] Operators.Scalarsubscription)
18:46:12.866 [boundedElastic-2] INFO reactor.Mono.Just.1 - | onSubscribe([Synchronous Fuseable] Operators.Scalarsubscription)
18:46:12.869 [boundedElastic-1] INFO reactor.Mono.Just.2 - | request(unbounded)
18:46:12.869 [boundedElastic-2] INFO reactor.Mono.Just.1 - | request(unbounded)
18:46:12.869 [boundedElastic-1] INFO reactor.Mono.Just.2 - | onNext(2)
18:46:12.869 [boundedElastic-2] INFO reactor.Mono.Just.1 - | onNext(1)
18:46:12.869 [boundedElastic-1] INFO reactor.Mono.Just.2 - | onComplete()
18:46:12.869 [boundedElastic-2] INFO reactor.Mono.Just.1 - | onComplete()

但是当我用 cassandra 查询替换我的示例 Mono 时,该查询使用 ReactiveCassandraRepository for ex 插入记录

Mono<Content> mono1 = cassandraOperations.insert(entity1).log().subscribeOn(Schedulers.boundedElastic());
Mono<Content> mono2 = cassandraOperations.insert(entity2).log().subscribeOn(Schedulers.boundedElastic());
Mono.zip(Arrays.asList(mono1,mono2),result -> result).block();

我得到这些日志:

2021-03-21 18:49:14.769  INFO 5664 --- [oundedElastic-1] reactor.Mono.MapFuseable.1 : | onSubscribe([Fuseable] FluxMapFuseable.MapFuseableSubscriber)
2021-03-21 18:49:14.769  INFO 5664 --- [oundedElastic-2] reactor.Mono.MapFuseable.2 : | onSubscribe([Fuseable] FluxMapFuseable.MapFuseableSubscriber)
2021-03-21 18:49:14.771  INFO 5664 --- [oundedElastic-2] reactor.Mono.MapFuseable.2 : | request(unbounded)
2021-03-21 18:49:14.771  INFO 5664 --- [oundedElastic-1] reactor.Mono.MapFuseable.1 : | request(unbounded)
2021-03-21 18:49:14.781  INFO 5664 --- [        s0-io-3] reactor.Mono.MapFuseable.2 : | onNext(com.acme.content.entity.Content@36cdb2c5)
2021-03-21 18:49:14.781  INFO 5664 --- [        s0-io-3] reactor.Mono.MapFuseable.2 : | onComplete()
2021-03-21 18:49:14.784  INFO 5664 --- [        s0-io-3] reactor.Mono.MapFuseable.1 : | onNext(com.acme.content.entity.Content@50d5f4a4)
2021-03-21 18:49:14.784  INFO 5664 --- [        s0-io-3] reactor.Mono.MapFuseable.1 : | onComplete()

OnSubscribe() 和 request() 方法按预期在专用线程 (oundedElastic-X) 内完成,但我很惊讶地看到 OnNext() 和 onComplete() 操作是在同一个线程内完成的(所以-io- 3).

我一定漏掉了一些东西,有人能解释一下两个样本中发生的差异吗?

谢谢!

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。