如何解决合并后如何保留两个 observable 发出的项目的顺序?
我遇到了 Scala Observables 的一种让我感到惊讶的行为。考虑我下面的例子:
object ObservablesDemo extends App {
val oFast = Observable.interval(3.seconds).map(n => s"[FAST] ${n*3}")
val oslow = Observable.interval(7.seconds).map(n => s"[SLOW] ${n*7}")
val oBoth = (oFast merge oslow).take(8)
oBoth.subscribe(println(_))
oBoth.toBlocking.toIterable.last
}
该代码演示了从两个 observable 发射元素。其中一个以“慢”的方式(每 7 秒)发出它的元素,另一个以“快”的方式(每 3 秒)发出它的元素。为了这个问题,假设我们想使用 map
函数定义这些可观察量,并适当地映射 interval
中的数字,如上所示(而不是另一种可能的方法,它会发出以相同的速率从两个 observable 中取出项目,然后根据需要filter
输出)。
[FAST] 0
[FAST] 3
[SLOW] 0
[FAST] 6
[FAST] 9 <-- HERE
[SLOW] 7 <-- HERE
[FAST] 12
[FAST] 15
有问题的部分是 [FAST]
observable 在 9
observable 发出 [SLOW]
之前发出 7
。我希望 7
在 9
之前发出,因为在第七秒发出的任何内容都应该在第九秒发出的内容之前。
我应该如何修改代码以实现预期的行为?我查看了 RxScala 文档并开始搜索不同的 interval
函数和 Scheduler
类等主题,但我不确定它是否是搜索答案的正确位置。
解决方法
这看起来应该是这样的。这里列出了秒数和事件。如果 RXScala 中可用,您可以使用 GENERATED ALWAYS AS IDENTITY
和 TestObserver
进行验证。 RXScala 于 2019 年停产,因此请记住这一点。
TestScheduler
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。