public class SessionStore { Subject<Session,Session> subject; public SessionStore() { subject = new SerializedSubject<>(BehaviorSubject.create(new Session()); } public void set(Session session) { subject.onNext(session); } public Observable<UserSession> observe() { return subject.distinctUntilChanged(); } }
在活动中,我观察会话并对每次更改执行网络操作:
private Subscription init() { return sessionStore .observe() .subscribeOn(Schedulers.io()) .observeOn(AndroidSchedulers.mainThread()) .flatMap(new Func1<Session,Observable<Object>>() { @Override public Observable<Object> call(Session session) { return retrofitService.getAThing(); } }) .subscribe(...); }
当我订阅会话存储时,主题立即发布在io()上,因为它是BehavIoUrSubject,订阅者在mainThread()上执行.
当我在订阅它时调用sessionStore.set(new AnotherSession())时会出现问题. IMO应该在io()调度程序上执行init()中定义的流.但是,相反的是,流在调用subject.onNext()的同一线程上执行.因为我在flatMap()中进行网络操作而导致networkonmainthreadException.
我是否理解主题错了?我这样滥用他们吗?那么适当的解决方案是什么?
我也尝试用observe()方法中的Observable.fromEmitter()替换整个主题方法,但令人惊讶的是输出是一样的.
解决方法
默认情况下,在Subject上调用onNext()会直接传播到所有Observer的onNext()回调方法.这些方法具有相同的名称并不奇怪.在某种程度上,在Subject上调用onNext()会在每个Subscriber上间接调用onNext().
让我们回顾一下:
如果从Thread-1调用Subject上的onNext,它将从Thread-1调用onNext到订阅者. onSubscribe将被删除.
retrofitService.getAThing()
我会猜测,并说它是调用线程.这将是observeOn中描述的线程,它是Android-UI-Loop.
observeOn下的每个值都将从调度程序指定的Thread-a转移到Thread-b. UI-Loop上的observeOn应该在订阅之前发生.订阅中将接收的每个值都将位于UI-Loop上,这不会阻止UI线程或以异常结束.
class SessionStore { private Subject<String,String> subject; public SessionStore() { subject = BehaviorSubject.create("wurst").toSerialized(); } public void set(String session) { subject.onNext(session); } public Observable<String> observe() { return subject .asObservable() .doOnNext(s -> System.out.println("Receiving value on Thread:: " + Thread.currentThread())) .distinctUntilChanged(); } } @Test public void name() throws Exception { // init SessionStore sessionStore = new SessionStore(); TestSubscriber testSubscriber = new TestSubscriber(); Subscription subscribe = sessionStore .observe() .flatMap(s -> { return Observable.fromCallable(() -> { System.out.println("flatMap Thread:: " + Thread.currentThread()); return s; }).subscribeOn(Schedulers.io()); }) .doOnNext(s -> System.out.println("After flatMap Thread:: " + Thread.currentThread())) .observeOn(Schedulers.newThread()) // imagine AndroidScheduler here .subscribe(testSubscriber); // Do UI-Stuff in subscribe new Thread(() -> { System.out.println("set on Thread:: " + Thread.currentThread()); sessionStore.set("123"); }).start(); new Thread(() -> { System.out.println("set on Thread:: " + Thread.currentThread()); sessionStore.set("345"); }).start(); boolean b = testSubscriber.awaitValueCount(3,3_000,TimeUnit.MILLISECONDS); Assert.assertTrue(b); }
输出::
Receiving value on Thread:: Thread[main,5,main] flatMap Thread:: Thread[RxIoScheduler-2,main] After flatMap Thread:: Thread[RxIoScheduler-2,main] set on Thread:: Thread[Thread-1,main] set on Thread:: Thread[Thread-0,main] Receiving value on Thread:: Thread[Thread-1,main]
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。