如何解决Webflux 生产者消费者问题webClient
嗨,我有 WebFlux 和背压问题:
Flux.range(0,100)
.flatMap((Integer y) -> {
return reallySlowApi();
})
.doOnEach((Signal<String> x1) -> {
log("next-------" );
})
.subscribeOn(Schedulers.elastic())
.subscribe()
;
我如何将呼叫限制为每 5 秒一次呼叫。注意:只有reallySlowApi可以修改。
private Mono<String> reallySlowApi() {
return webClient
.get()
.retrieve()
.bodyToMono(String.class);
}
编辑:我知道 delayElements
但如果 Api 变得更慢,它不会解决问题。我需要使用 reallySlowApi
的最佳方式。
解决方法
一种方法是使用 delayElements()
public void run() {
Flux.range(0,100)
.delayElements(Duration.ofSeconds(5)) // only emit every 5 seconds
.flatMap(y -> reallySlowApi())
.doOnNext(x1 -> System.out.println("next-------"))
.blockLast(); // subscribe AND wait for the flux to complete
}
private Mono<String> reallySlowApi() {
return Mono.just("next");
}
您也可以使用 Flux.interval() 加上 take() 来限制迭代次数。
Flux.interval(Duration.ofSeconds(5))
.take(100)
请注意,您的示例中的 subscribeOn 没有做任何特别的事情,因为 subscribe 操作适用于不阻塞的 0-100 范围的生成。
,您可以在您的网络客户端代码中使用重试机制
.doOnError(error -> handleError(error.getMessage()))
.timeout(Duration.ofSeconds(ServiceConstants.FIVE))
.retryWhen(
Retry.backoff(retryCount,Duration.ofSeconds(ServiceConstants.FIVE))
.filter(throwable -> throwable instanceof TimeoutException)
)
,
只是把我找到的解决方案放在这里。 WebFlux 在映射响应时我们可以传递并发参数来解决这个问题。
flatMap(mapper,并发)
.flatMap((Integer y) -> {
return reallySlowApi();
},3)
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。