微信公众号搜"智元新知"关注
微信扫一扫可直接关注哦!

为什么 subscribeOn 对 Rxjava 中的 PublishSubject 没有影响?

如何解决为什么 subscribeOn 对 Rxjava 中的 PublishSubject 没有影响?

这是我在 Kotlin 中的测试代码

fun main() {
    rxjava()
}

fun rxjava() {
    val queuSubject = PublishSubject.create<String>()
    queuSubject
        .map { t ->
            val a = t.toLong()
            Thread.sleep(6000 / a)
            println("map $a called ${Thread.currentThread().name} ")
            a
        }
        .subscribeOn(Schedulers.io())
        .observeOn(Schedulers.io())
        .subscribe({
            println("thread in subscription ${Thread.currentThread().name}")
        },{
            println("error ${it.message}")
        })
    for (i in 1..3) {
        Thread {
            queuSubject.onNext("$i")
        }.start()
    }
    Thread.sleep(15000)
}

我正在尝试在不同的 IO 线程中运行 map 块和 subscribe's onNext 块。但是输出是这样的:

map 3 called Thread-2 
thread in subscription RxCachedThreadScheduler-2
map 2 called Thread-1 
thread in subscription RxCachedThreadScheduler-2
map 1 called Thread-0 
thread in subscription RxCachedThreadScheduler-2

如您所见,调用 subscribeOn 似乎对 PublishSubject's 流没有影响,而 thread-0,thread-1 and thread-2 指的是调用 onNext 方法的线程。

另外考虑以下代码

fun main() {
    rxjava()
}

fun rxjava() {
    val queuSubject = PublishSubject.create<String>()
    queuSubject
        .map { t ->
            val a = t.toLong()
            Thread.sleep(6000 / a)
            println("map $a called ${Thread.currentThread().name} ")
            a
        }
        .subscribeOn(Schedulers.io())
        .observeOn(Schedulers.io())
        .subscribe({
            println("thread in subscription ${Thread.currentThread().name}")
        },{
            println("error ${it.message}")
        })
    queuSubject.onNext("1")
    queuSubject.onNext("2")
    queuSubject.onNext("3")
    Thread.sleep(15000)
}

我写了上面的代码,看到没有输出输出。但是如果我从流中删除 subscribeOn,消息会按如下顺序打印:

map 1 called main 
thread in subscription RxCachedThreadScheduler-1
map 2 called main 
thread in subscription RxCachedThreadScheduler-1
map 3 called main 
thread in subscription RxCachedThreadScheduler-1

这些代码有什么问题?谢谢。

解决方法

因为 subscribeOn 只影响源的订阅副作用。如果源在观察者订阅时立即开始发出事件,则会产生这种副作用:

Observable.just(1,2,3)
.subscribeOn(Schedulers.io())
.doOnNext(v -> System.out.println(Thread.currentThread() + " - " + v)
.blockingSubscribe();

PublishSubject 没有订阅副作用,因为它只将信号从其 onXXX 方法中继到观察者的 onXXX 方法。

然而,subscribeOn 具有时间效应,因为它延迟了对源的实际订阅,因此在 PublishSubject 的情况下,它可能无法及时看到已注册的观察者,其他线程调用其 {{ 1}} 方法。

如果您想将处理移出原始线程,请使用 onXXX:

observeOn

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。