微信公众号搜"智元新知"关注
微信扫一扫可直接关注哦!

传递覆盖 onSubscribe 的订阅者时调用 dispose()

如何解决传递覆盖 onSubscribe 的订阅者时调用 dispose()

我是 RxJava 的新手,如果我理解正确,Observer 会在 disposable 上传onSubscribe,因此如果 dispose() 已经被调用了。
我创建了以下代码

@NonNull Observable<Long> src = Observable.interval(1,TimeUnit.SECONDS);
src.subscribe(new Observer<Long>() {
      private disposable d;

      @Override
      public void onSubscribe(@NonNull disposable d) {
           this.d = d;
      }

      @Override
      public void onNext(@NonNull Long aLong) {
           if(!d.isdisposed()) {
              System.out.println("Number onNext = " + aLong);
           }
      }

       @Override
       public void onError(@NonNull Throwable e) {

       }

       @Override
       public void onComplete() {
           System.out.println("completed");
       }
 });

但我不知道如何为该订阅调用 dispose()subscribeObserver 作为参数传递返回 void 并且 subscribeWith 不接受我的 Observer 而没有编译错误

这应该如何工作?我在这里误解了什么?

解决方法

ObservableJavaDocs 有一个简单的例子:

Disposable d = Observable.just("Hello world!")
     .delay(1,TimeUnit.SECONDS)
     .subscribeWith(new DisposableObserver<String>() {
         @Override public void onStart() {
             System.out.println("Start!");
         }
         @Override public void onNext(String t) {
             System.out.println(t);
         }
         @Override public void onError(Throwable t) {
             t.printStackTrace();
         }
         @Override public void onComplete() {
             System.out.println("Done!");
         }
     });

 Thread.sleep(500);
 // the sequence can now be disposed via dispose()
 d.dispose();

编辑

以下示例是从 Disposable 方法中获取 onSubscribe 但通常不推荐的方法:

// field in the owner class
Disposable disposable;

public void doReactive() {
    Observable<Long> src = Observable.interval(1,TimeUnit.SECONDS);
    src.subscribe(new Observer<Long>() {

        @Override
        public void onSubscribe(@NonNull Disposable d) {
           disposable = d;
        }

        // ...
    });
}

public void cleanup() {
   if (disposable != null) {
       disposable.dispose();
       disposable = null;
   }
}

SerialDisposable sd = new SerialDisposable();

Observable<Long> src = Observable.interval(1,TimeUnit.SECONDS);
    src.subscribe(new Observer<Long>() {

        @Override
        public void onSubscribe(@NonNull Disposable d) {
           sd.set(d);
        }

        // ...
    });

// ...

sd.dispose();
,

您可以使用 DisposableObserver,它在您完成观察后可以轻松处理。

@NonNull Observable<Long> src = Observable.interval(1,TimeUnit.SECONDS);
    src.subscribe(new DisposableObserver<Long>() {
        @Override
        public void onNext(@NotNull Long aLong) {
            //Do anything you want to do..
            dispose();
        }

        @Override
        public void onError(@NotNull Throwable e) {
            //Handle the errors here..
            dispose();
        }

        @Override
        public void onComplete() {
            dispose();
        }
    });

您还可以使用 CompositeDisposable 一次处理多个观察者,有关更多详细信息,请查看。

https://www.tutorialspoint.com/rxjava/rxjava_compositedisposable.htm

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。