如何解决解释在 RxJava 和 Kotlin 协程中扫描 + 发布到参与源的区别
我正在将一段代码从 Rx 移植到协程,并遇到了我无法理解的行为。
背景:假设您有一个值流,每个值都与稍后要执行的 action-lambda 相关联。还有一个“外部”流,lambda 可以将其发布到该流中,并将其合并到结果流中。
我将原始代码简化为这个更简单(但仍然很棘手)的版本:
val shared = PublishSubject.create<String>()
Observable
.merge(
subject.map { it to { } },Observable.just("item1","item2")
.map { it to { shared.onNext(it.toupperCase()) } }
)
.scan("*" to { }) { accumulator,value ->
(accumulator.first + "_" + value.first) to value.second
}
.subscribe {
it.second()
println("got ${it.first}")
}
这个打印
got *
got *_item1
got *_item1_ITEM1
got *_item1_ITEM1_item2
got *_item1_ITEM1_item2_ITEM2
接下来我有这个协同程序+基于流的版本。
显着的区别在于它向 lambda 添加了 suspend
修饰符(以便能够调用 shared.emit()
.
runBlocking {
val shared = MutableSharedFlow<String>()
merge(
shared.map { it to suspend {} },flowOf("item1","item2").map { it to suspend { shared.emit(it.toupperCase()) } }
)
.scan("*" to suspend { }) { accumulator,value ->
(accumulator.first + "_" + value.first) to value.second
}
.collect {
it.second()
println("got ${it.first}")
}
}
这个打印
got *
got *_item1
got *_item1_item2
got *_item1_item1_ITEM1
got *_item1_item2_ITEM1_ITEM2
请注意,在 Rx 版本中,大写的 ITEM 发射散布着小写的发射,而在协程版本中,它们排在最后。
我想问的问题:
- 为什么会发生这种情况?是由于暂停的 lambda 吗?如果有复杂的事情发生,将不胜感激分步解释
- Rx 是否有一些内部缓冲区可以让它像现在一样运行?
- 能否使用 Flow 实现类似的行为?如果可以,如何实现?
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。