微信公众号搜"智元新知"关注
微信扫一扫可直接关注哦!

Reactor - 在处理错误的情况下延迟 Flux 元素

如何解决Reactor - 在处理错误的情况下延迟 Flux 元素

我遇到了与 this question 类似的问题,但我没有看到可接受的答案。我已经研究过了,没有得到满意的答案。

我有一个轮询数量为“x”的反应式 Kafka 消费者(Spring Reactor),应用程序使用反应式 Web 客户端将轮询的消息推送到反应式端点。这里的问题是外部服务可能会以不同的方式超时执行,当我们看到很多故障时,我将不得不调整 Kafka 消费者以在断路器打开(或启动背压)时轮询更少的消息。目前的反应堆有没有办法自动

  1. 在断路器处于断开状态时做出反应并减少轮询量或减慢消耗。
  2. 当电路关闭时,将轮询量增加到之前的状态(如果外部服务关闭,则会按比增加)。

我不想使用 delayElementsdelayUntil,因为它们本质上大多是静态的,并且希望应用程序在运行时做出反应。如何配置这些端到端背压?当电路关闭、部分关闭和在应用配置中打开时,我会为消费者提供值。

解决方法

由于背压基于消费者的缓慢,实现这一点的一种方法是将某些异常类型转换为延迟。您可以将 onErrorResume 用于此目的,如下所示:

long start = System.currentTimeMillis();

Flux.range(1,1000)
        .doOnNext(item -> System.out.println("Elpased " + (System.currentTimeMillis() - start) + " millis for item: " + item))
        .flatMap(item -> process(item).onErrorResume(this::slowDown),5) // concurrency limit for demo
        .blockLast();

System.out.println("Flow took " + (System.currentTimeMillis() - start) + " milliseconds.");

private Mono<Integer> process(Integer item) {
    // simulate error for some items
    if (item >= 50 && item <= 100) {
        return Mono.error(new RuntimeException("Downstream failed."));
    }

    // normal processing
    return Mono.delay(Duration.ofMillis(10))
            .thenReturn(item);
}

private Mono<Integer> slowDown(Throwable e) {
    if (e instanceof RuntimeException) { // you could check for circuit breaker exception
        return Mono.delay(Duration.ofMillis(1000)).then(Mono.empty()); // delay to slow down
    }

    return Mono.empty(); // no delay for other errors
}

如果您检查此代码的输出,您会发现第 50 项和第 100 项之间的速度有些慢,但前后都以正常速度运行。

请注意,我的示例没有使用 Kafka。由于您使用的是支持背压的 reactor-kafka 库,因此它的工作方式应该与这个虚拟示例相同。

此外,由于 Flux 可能会同时处理项目,因此减速不会立即发生,它会在适当减速之前尝试处理一些额外的项目。

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。