微信公众号搜"智元新知"关注
微信扫一扫可直接关注哦!

一次只发出 1 个事件的 Reactor Sink?

如何解决一次只发出 1 个事件的 Reactor Sink?

我正在玩 Replaying Reactor Sinks,我正在尝试实现单播和重播处理器的混合。我希望它同时只向一个订阅者发送(UnicastProcessor),但它也可以在订阅时发送一个认值(ReplayProcessor)。以下是与真实案例类似的内容

Flux<Boolean> monoC = Sinks.many().replay().latestOrDefault(true).asFlux().doOnNext(integer -> System.out.println(new Date() + " - " + Thread.currentThread().getName() + "    emiting next"));
for(int i = 0; i < 5; i++) {
    new Thread(() -> {
        monoC.flatMap(unused ->
                webClientBuilder.build()
                        .get()
                        .uri("https://www.google.com")
                        .retrieve()
                        .toEntityFlux(String.class)
                        .doOnSuccess(stringResponseEntity -> {
                            System.out.println(new Date() + " - " + Thread.currentThread().getName() + "    finished processing");
                        })
        ).subscribe();
    }).start();
}

那就是打印:

emiting next
...
emiting next
finished processing
...
finished processing

相反,我希望它打印:

emiting next
finished processing
...
emiting next
finished processing

更新,对真实案例进行更多说明:

真实案例场景是:我有一个 Spring WebFlux 应用程序,它的作用类似于中继,它在特定端点 A 上接收请求,并将其中继到另一个微服务 B。如果我,该微服务可以回复 429走得太快,在标题中我必须等待多长时间才能再次重试。我已经使用 .retry 运算符和 Mono.delay 实现了重试,但与此同时,我可以在我的第一个端点 A 上收到另一个请求,该请求必须被阻塞,直到 Mono.delay 完成。

我正在尝试使用 Replay Sink 来实现这一点,因此在收到 429 后,我会向接收器发出“false”,并且在 Mono.delay 结束后,它会向接收器发出 true,因此如果在平均当我收到关于 A 的任何进一步请求时,它可以过滤掉所有错误并等待发出真实。

最重要的问题是,当我收到太多要在 A 上中继的请求时,微服务 B 开始响应缓慢,并变得过载。因此,我想限制接收器发射的速率。准确地说,我希望发布者发出一个值,但在订阅者点击 onCompleted 之前不要再发出。

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。