微信公众号搜"智元新知"关注
微信扫一扫可直接关注哦!

java – Observable的doOnError正确位置

我对观察者来说是个新手,我仍在努力解决这些问题.我有以下代码
observableKafka.getRealTimeEvents()
        .filter(this::isTrackedAccount)
        .filter(e -> LedgerMapper.isDepositOrClosedTrade((Transaction) e.getPayload()))
        .map(ledgerMapper::mapLedgerTransaction)
        .map(offerCache::addTransaction)
        .filter(offer -> offer != null)  // Offer may have been removed from cache since last check
        .filter(Offer::isReady)
        .doOnError(throwable -> { 
              LOG.info("Exception thrown on realtime events");
          })
        .forEach(awardChecker::awardFailOrIgnore);

getRealTimeEvents()返回一个Observable< Event>.

.doOnError的位置是否重要?另外,在这代码添加多个调用会产生什么影响?我已经意识到我可以做到并且所有这些都被调用,但我不确定它的目的是什么.

解决方法

是的,它确实.当错误在特定点传递流时,doOnError会起作用,因此如果doOnError之前的操作符抛出,则会调用您的操作.但是,如果进一步放置doOnError,可能会也可能不会调用它,具体取决于链中的下游运算符.

特定

Observer<Object> ignore = new Observer<Object>() {
    @Override public void onCompleted() {
    }
    @Override public void onError(Throwable e) {
    }
    @Override public void onNext(Object t) {
    }
};

例如,以下代码将始终调用doOnError:

Observable.<Object>error(new Exception()).doOnError(e -> log(e)).subscribe(ignore);

但是,此代码不会:

Observable.just(1).doOnError(e -> log(e))
.flatMap(v -> Observable.<Integer>error(new Exception())).subscribe(ignore);

大多数操作符都会反弹源自下游的异常.

如果通过onErrorResumeNext或onExceptionResumeNext转换异常,则添加multipe doOnError是可行的:

Observable.<Object>error(new RuntimeException())
.doOnError(e -> log(e))
.onErrorResumeNext(Observable.<Object>error(new IllegalStateException()))
.doOnError(e -> log(e)).subscribe(ignore);

否则,您将在链的多个位置记录相同的异常.

原文地址:https://www.jb51.cc/java/126788.html

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。

相关推荐