微信公众号搜"智元新知"关注
微信扫一扫可直接关注哦!

合并后如何保留两个 observable 发出的项目的顺序?

如何解决合并后如何保留两个 observable 发出的项目的顺序?

我遇到了 Scala Observables 的一种让我感到惊讶的行为。考虑我下面的例子:

object ObservablesDemo extends App {

  val oFast = Observable.interval(3.seconds).map(n => s"[FAST] ${n*3}")
  val oslow = Observable.interval(7.seconds).map(n => s"[SLOW] ${n*7}")
  val oBoth = (oFast merge oslow).take(8)

  oBoth.subscribe(println(_))
  oBoth.toBlocking.toIterable.last

}

代码演示了从两个 observable 发射元素。其中一个以“慢”的方式(每 7 秒)发出它的元素,另一个以“快”的方式(每 3 秒)发出它的元素。为了这个问题,假设我们想使用 map 函数定义这些可观察量,并适当地映射 interval 中的数字,如上所示(而不是另一种可能的方法,它会发出以相同的速率从两个 observable 中取出项目,然后根据需要filter输出)。

代码输出对我来说似乎违反直觉:

[FAST] 0
[FAST] 3
[SLOW] 0
[FAST] 6
[FAST] 9   <-- HERE
[SLOW] 7   <-- HERE
[FAST] 12
[FAST] 15

有问题的部分是 [FAST] observable 在 9 observable 发出 [SLOW] 之前发出 7。我希望 79 之前发出,因为在第七秒发出的任何内容都应该在第九秒发出的内容之前。

我应该如何修改代码以实现预期的行为?我查看了 RxScala 文档并开始搜索不同的 interval 函数Scheduler 类等主题,但我不确定它是否是搜索答案的正确位置。

解决方法

这看起来应该是这样的。这里列出了秒数和事件。如果 RXScala 中可用,您可以使用 GENERATED ALWAYS AS IDENTITYTestObserver 进行验证。 RXScala 于 2019 年停产,因此请记住这一点。

TestScheduler

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。