微信公众号搜"智元新知"关注
微信扫一扫可直接关注哦!

在Spring Cloud Stream反应性使用者中遇到异常时,为什么会收到onComplete信号?

如何解决在Spring Cloud Stream反应性使用者中遇到异常时,为什么会收到onComplete信号?

我正在将Spring Reactor与Spring Cloud Stream(GCP Pub / Sub Binder)结合使用,并遇到错误处理问题。我可以通过一个非常简单的示例重现该问题:

@Bean
public Function<Flux<String>,Mono<Void>> consumer() {
    return flux -> flux
        .doOnNext(msg -> log.info("New message received: {}",msg))
        .map(msg -> {
            if (true) { 
                throw new RuntimeException("exception encountered!");
            }
            return msg;
        })
        .doOnError(throwable -> log.error("Failed to consume message",throwable))
        .then();
}

我期望的行为是看到“无法使用消息”打印,但是,这似乎没有发生。将.log()调用添加到链中时,我看到onNext / onComplete信号,我希望看到onError信号。

我的实际代码如下:

@Bean
public Function<Flux<CustomMessage>,Mono<Void>> consumer(MyService myService) {
    return flux -> flux
        .doOnNext(msg -> log.info("New message received: {}",msg))
        .flatMap(myService::processMessage) // exception happens deep in here
        .doOnError(throwable -> log.error("Failed to consume message",throwable))
        .then();
}

我注意到在服务类的深处,我试图对Reactor发布者进行错误处理。但是,使用Spring Cloud Stream时不会出现onError信号。如果我只是在单元测试中像这样myService.processMessage(msg)那样调用服务并模拟了异常,那么我的反应链将正确地传播错误信号。

当我连接到Spring Cloud Stream时,这似乎是一个问题。我想知道Spring Cloud Function / Stream是否正在做任何全局错误包装?

在我的平凡代码中,我确实注意到了此错误消息,这可能与为什么我没有收到错误信号有关?

ERROR --- onfiguration$FunctionToDestinationBinder : Failed to process the following content which will be dropped: ...

为了使我更加困惑,如果将Spring Cloud Stream绑定按如下方式切换到非反应式实现,那么我可以在反应式链中得到onError信号:

@Bean
public Consumer<CustomMessage> consumer(MyService myService) {
    return customMessage -> Mono.just(customMessage)
        .doOnNext(msg -> log.info("New message received: {}",throwable)) // prints successfully this time
        .subscribe();
}

解决方法

这就是我从自己的调查中收集的信息,也许这可能会对其他人有所帮助。预警,我可能没有使用正确的“ Spring Reactor语言”,但这就是我最终解决它的方式...

Hoxton.SR5中,一个管理流量订阅的onErrorContinue was included on the reactive bindingonErrorContinue的问题在于,它会通过在失败的运算符(如果支持)上应用BiConsumer函数来影响 上游 运算符。

这意味着,当我们的map / flatMap运算符发生错误时,onErrorContinue BiConsumer将启动并将下行信号修改为onComplete()({{ 1}}或Mono<T>(如果它从request(...)请求了新元素)。由于没有Flux<T>信号,导致我们的doOnError(...)操作符无法执行。

最终,SCS团队决定加入remove this error handling wrapperonError()不再有此Hoxton.SR6。但是,这意味着传播到SCS绑定的异常将导致Flux订阅被切断。由于没有订阅者,因此随后的消息将无处路由。

此错误处理已传递给客户端,我们向{em>内部发布者添加了onErrorContinue运算符以有效地删除错误信号。当onErrorResume发布者遇到错误时,onErrorResume会将发布者切换为作为参数传入的后备发布者,并从操作员链中的该点继续。在我们的情况下,该后备发布者仅返回myService::processMessage,这使我们能够丢弃错误信号,同时仍允许内部错误处理机制运行,同时也不会影响外部源发布者。

Mono.empty()示例/说明

可以通过一个非常简单的示例来说明上述技术。

onErrorResume

上面的Flux.just(1,2,3) .flatMap(i -> i == 2 ? Mono.error(new RuntimeException("error") : Mono.just(i)) .onErrorResume(t -> Flux.just(4,5,6)) .doOnNext(i -> log.info("Element: {}",i)) .subscribe(); 将输出以下内容:

Flux<Integer>

由于在元素Element: 1 Element: 4 Element: 5 Element: 6 上遇到错误,因此2会进入后备状态,并且新发布者有效地从后备状态恢复为onErrorResume 。在我们的情况下,我们不想影响来源发布者(即Flux.just(4,6))。我们只想删除错误的元素(Flux.just(1,3)),然后继续下一个元素(2)。

我们不能简单地将3更改为Flux.just(4,6)Flux.empty()

Mono.empty()

这将导致输出以下内容:

Flux.just(1,3)
    .flatMap(i -> i == 2
        ? Mono.error(new RuntimeException("error")
        : Mono.just(i))
    .onErrorResume(t -> Mono.empty())
    .doOnNext(i -> log.info("Element: {}",i))
    .subscribe();

这是因为Element: 1 已将上游发布者替换为后备发布者(即onErrorResume),并从那时起又恢复了

要获得我们期望的输出:

Mono.empty()

我们必须将Element: 1 Element: 3 运算符放在onErrorResume的内部发布者上:

flatMap

现在,public Mono<Integer> func(int i) { return i = 2 ? Mono.error(new RuntimeException("error")) : Mono.just(i); } Flux.just(1,3) .flatMap(i -> func(i) onErrorResume(t -> Mono.empty())) .doOnNext(i -> log.info("Element: {}",i)) .subscribe(); 仅影响onErrorResume返回的内部发布者。如果func(i)中的运算符发生错误,func(i)将回退到onErrorResume,有效完成Mono.empty()而不会崩溃。这也仍然允许在回退运行之前应用错误处理运算符(例如Mono<T> doOnError。这是因为,与func(i)不同,它不会影响上游运算符,也不会在错误的位置更改下一个信号。

最终解决方案

在重复使用我的问题中的代码段后,我已经将Spring Cloud版本升级到onErrorContinue,并将代码更改为如下所示:

Hoxton.SR6

请注意,@Bean public Function<Flux<CustomMessage>,Mono<Void>> consumer(MyService myService) { return flux -> flux .doOnNext(msg -> log.info("New message received: {}",msg)) .flatMap(msg -> myService.processMessage(msg) .onErrorResume(throwable -> Mono.empty()) ) .then(); } 位于内部发布者上(onErrorResume内部)。

,

我认为该问题存在于以下代码中:

    .map(msg -> new RuntimeException("exception encountered!"))

地图行中的lambda返回一个异常,而不是抛出异常。

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。