微信公众号搜"智元新知"关注
微信扫一扫可直接关注哦!

解释在 RxJava 和 Kotlin 协程中扫描 + 发布到参与源的区别

如何解决解释在 RxJava 和 Kotlin 协程中扫描 + 发布到参与源的区别

我正在将一段代码从 Rx 移植到协程,并遇到了我无法理解的行为。

背景:假设您有一个值流,每个值都与稍后要执行的 action-lambda 相关联。还有一个“外部”流,lambda 可以将其发布到该流中,并将其合并到结果流中。

我将原始代码简化为这个更简单(但仍然很棘手)的版本:

val shared = PublishSubject.create<String>()
Observable
  .merge(
    subject.map { it to { } },Observable.just("item1","item2")
      .map { it to { shared.onNext(it.toupperCase()) } }
  )
  .scan("*" to { }) { accumulator,value ->
    (accumulator.first + "_" + value.first) to value.second
  }
  .subscribe {
    it.second()
    println("got ${it.first}")
  }

这个打印

got *
got *_item1
got *_item1_ITEM1
got *_item1_ITEM1_item2
got *_item1_ITEM1_item2_ITEM2

接下来我有这个协同程序+基于流的版本。 显着的区别在于它向 lambda 添加suspend 修饰符(以便能够调用 shared.emit().

runBlocking {
  val shared = MutableSharedFlow<String>()
  merge(
    shared.map { it to suspend {} },flowOf("item1","item2").map { it to suspend { shared.emit(it.toupperCase()) } }
  )
    .scan("*" to suspend { }) { accumulator,value ->
      (accumulator.first + "_" + value.first) to value.second
    }
    .collect {
      it.second()
      println("got ${it.first}")
    }
}

这个打印

got *
got *_item1
got *_item1_item2
got *_item1_item1_ITEM1
got *_item1_item2_ITEM1_ITEM2

请注意,在 Rx 版本中,大写的 ITEM 发射散布着小写的发射,而在协程版本中,它们排在最后。

我想问的问题:

  1. 为什么会发生这种情况?是由于暂停的 lambda 吗?如果有复杂的事情发生,将不胜感激分步解释
  2. Rx 是否有一些内部缓冲区可以让它像现在一样运行?
  3. 能否使用 Flow 实现类似的行为?如果可以,如何实现?

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。