如何解决发现第一个结果时RxJava中断并行执行
我正在并行运行一些服务。任务是在所有服务完成执行后不要等待。
Observable.just(2,3,5)
.map(delay -> serviceReturningSingleWithDelay(delay))
.toList()
.flatMap(list ->
Single.zip(list,output -> Arrays.stream(output)
.map(delay -> (Integer) delay)
.filter(delay -> delay == 3)
.findFirst()
.orElse(0)
))
.subscribe(System.out::println);
private Single<Integer> serviceReturningSingleWithDelay(Integer delay) {
return Single.just(delay)
.delay(delay,TimeUnit.SECONDS)
.doOnSuccess(s -> System.out.printf("Delay %d: Thread : %s \n",delay,Thread.currentThread().getName()));
}
现在我的输出是:
Delay 2: Thread : RxcomputationThreadPool-1
Delay 3: Thread : RxcomputationThreadPool-2
Delay 5: Thread : RxcomputationThreadPool-3
3
期望的结果是获取过滤值-在 RxcomputationThreadPool-3 线程完成执行之前3。 我将感谢您的任何想法。
解决方法
如果要并行运行它们并在收到值3时退出,则无需使用zip
。宁可使用takeWhile
来中断您的可观察对象,如下所示:
Observable.just(2,3,5)
.flatMapSingle(this::serviceReturningSingleWithDelay)
.takeWhile(e -> e != 3)
.subscribe(System.out::println);
如果要使用3
值,请使用takeUntil(e -> e == 3)
而不是takeWhile(e -> e != 3)
基于@bubbles答案的另一种方法。如果您想保留flatmap
并保留在Kotlin
fun main(args: Array<String>) {
Observable.just(5L,8L,10L)
.flatMap {
serviceReturningSingleWithDelay(it).toObservable()
}
.takeWhile { delay -> delay != 8L }
.subscribeBy(
onNext = { println("Delay period $it")}
)
Thread.sleep(10000)
}
private fun serviceReturningSingleWithDelay(delay: Long): Single<Long> {
return Single.just(delay)
.delay(delay,TimeUnit.SECONDS)
.doOnSuccess {
println("Delay $it Thread ${Thread.currentThread().name}")
}
}
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。