如何解决如果自流中的上一个项目起有间隔,则发出一个新项目
我有一连串的外发消息。它们可以任意间隔发生。如果在发送最后一条消息后的一段时间内没有任何消息,我想发出一条新消息,该消息充当保持活动状态或心跳信号。
这是我尝试过的代码示例。假设我想在“ C”之后直到“ D”之后每1s发出一次心跳消息。
Flux.concat(
Flux.just("A","B","C").delayElements(Duration.ofMillis(500)),Flux.just("D").delaySequence(Duration.ofSeconds(5))
)
.windowTimeout(1,Duration.ofSeconds(1))
.flatMap(window -> window.switchIfEmpty(Mono.just("*")))
.log()
.blockLast();
这是输出
14:30:14.659 [parallel-2] INFO reactor.Flux.FlatMap.1 - onNext(A)
14:30:15.162 [parallel-3] INFO reactor.Flux.FlatMap.1 - onNext(B)
14:30:15.663 [parallel-4] INFO reactor.Flux.FlatMap.1 - onNext(C)
14:30:16.664 [parallel-1] INFO reactor.Flux.FlatMap.1 - onNext(*)
14:30:17.665 [parallel-1] INFO reactor.Flux.FlatMap.1 - onNext(*)
14:30:18.664 [parallel-1] INFO reactor.Flux.FlatMap.1 - onNext(*)
14:30:19.670 [parallel-1] INFO reactor.Flux.FlatMap.1 - onNext(*)
14:30:20.665 [parallel-1] INFO reactor.Flux.FlatMap.1 - onNext(*)
14:30:20.676 [parallel-1] INFO reactor.Flux.FlatMap.1 - onNext(D)
14:30:20.677 [parallel-1] INFO reactor.Flux.FlatMap.1 - onNext(*) // Why?
14:30:20.679 [parallel-1] INFO reactor.Flux.FlatMap.1 - onComplete()
在此示例中,即使我指定了5秒,D也会跟随C 5.013秒,因此,如果在中间发出4或5个项目/心跳,我就不会感到烦恼。不需要那么精确。
但是为什么在D之后省略另一个项目?有办法解决吗?也许我使用了错误的操作。
我想我可以使用处理器来实现它,但是the documentation says
大多数时候,您应该避免使用处理器。
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。