如何解决传递覆盖 onSubscribe 的订阅者时调用 dispose()
我是 RxJava 的新手,如果我理解正确,Observer
会在 disposable
上传递 onSubscribe
,因此如果 dispose()
已经被调用了。
我创建了以下代码:
@NonNull Observable<Long> src = Observable.interval(1,TimeUnit.SECONDS);
src.subscribe(new Observer<Long>() {
private disposable d;
@Override
public void onSubscribe(@NonNull disposable d) {
this.d = d;
}
@Override
public void onNext(@NonNull Long aLong) {
if(!d.isdisposed()) {
System.out.println("Number onNext = " + aLong);
}
}
@Override
public void onError(@NonNull Throwable e) {
}
@Override
public void onComplete() {
System.out.println("completed");
}
});
但我不知道如何为该订阅调用 dispose()
。 subscribe
将 Observer
作为参数传递返回 void
并且 subscribeWith
不接受我的 Observer
而没有编译错误。
这应该如何工作?我在这里误解了什么?
解决方法
Observable
的 JavaDocs 有一个简单的例子:
Disposable d = Observable.just("Hello world!")
.delay(1,TimeUnit.SECONDS)
.subscribeWith(new DisposableObserver<String>() {
@Override public void onStart() {
System.out.println("Start!");
}
@Override public void onNext(String t) {
System.out.println(t);
}
@Override public void onError(Throwable t) {
t.printStackTrace();
}
@Override public void onComplete() {
System.out.println("Done!");
}
});
Thread.sleep(500);
// the sequence can now be disposed via dispose()
d.dispose();
编辑
以下示例是从 Disposable
方法中获取 onSubscribe
但通常不推荐的方法:
// field in the owner class
Disposable disposable;
public void doReactive() {
Observable<Long> src = Observable.interval(1,TimeUnit.SECONDS);
src.subscribe(new Observer<Long>() {
@Override
public void onSubscribe(@NonNull Disposable d) {
disposable = d;
}
// ...
});
}
public void cleanup() {
if (disposable != null) {
disposable.dispose();
disposable = null;
}
}
或
SerialDisposable sd = new SerialDisposable();
Observable<Long> src = Observable.interval(1,TimeUnit.SECONDS);
src.subscribe(new Observer<Long>() {
@Override
public void onSubscribe(@NonNull Disposable d) {
sd.set(d);
}
// ...
});
// ...
sd.dispose();
,
您可以使用 DisposableObserver,它在您完成观察后可以轻松处理。
@NonNull Observable<Long> src = Observable.interval(1,TimeUnit.SECONDS);
src.subscribe(new DisposableObserver<Long>() {
@Override
public void onNext(@NotNull Long aLong) {
//Do anything you want to do..
dispose();
}
@Override
public void onError(@NotNull Throwable e) {
//Handle the errors here..
dispose();
}
@Override
public void onComplete() {
dispose();
}
});
您还可以使用 CompositeDisposable 一次处理多个观察者,有关更多详细信息,请查看。
https://www.tutorialspoint.com/rxjava/rxjava_compositedisposable.htm
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。