微信公众号搜"智元新知"关注
微信扫一扫可直接关注哦!

Webflux 生产者消费者问题webClient

如何解决Webflux 生产者消费者问题webClient

嗨,我有 WebFlux 和背压问题:

    Flux.range(0,100)
            .flatMap((Integer y) -> {
                return reallySlowApi(); 
            })
            .doOnEach((Signal<String> x1) -> {
                log("next-------" );
            })
            .subscribeOn(Schedulers.elastic())
            .subscribe()
    ;

我如何将呼叫限制为每 5 秒一次呼叫。注意:只有reallySlowApi可以修改

private Mono<String> reallySlowApi() {
    return webClient
            .get()
            .retrieve()
            .bodyToMono(String.class);
}

编辑:我知道 delayElements 但如果 Api 变得更慢,它不会解决问题。我需要使用 reallySlowApi 的最佳方式。

解决方法

一种方法是使用 delayElements()

  public void run() {
    Flux.range(0,100)
        .delayElements(Duration.ofSeconds(5)) // only emit every 5 seconds
        .flatMap(y -> reallySlowApi())
        .doOnNext(x1 -> System.out.println("next-------"))
        .blockLast(); // subscribe AND wait for the flux to complete
  }

  private Mono<String> reallySlowApi() {
    return Mono.just("next");
  }

您也可以使用 Flux.interval() 加上 take() 来限制迭代次数。

   Flux.interval(Duration.ofSeconds(5))
        .take(100)

请注意,您的示例中的 subscribeOn 没有做任何特别的事情,因为 subscribe 操作适用于不阻塞的 0-100 范围的生成。

,

您可以在您的网络客户端代码中使用重试机制

.doOnError(error -> handleError(error.getMessage()))
            .timeout(Duration.ofSeconds(ServiceConstants.FIVE))
            .retryWhen(
                    Retry.backoff(retryCount,Duration.ofSeconds(ServiceConstants.FIVE))
                            .filter(throwable -> throwable instanceof TimeoutException)
            )
,

只是把我找到的解决方案放在这里。 WebFlux 在映射响应时我们可以传递并发参数来解决这个问题。

flatMap(mapper,并发)

.flatMap((Integer y) -> {
     return reallySlowApi(); 
 },3)

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。