微信公众号搜"智元新知"关注
微信扫一扫可直接关注哦!

如果自流中的上一个项目起有间隔,则发出一个新项目

如何解决如果自流中的上一个项目起有间隔,则发出一个新项目

我有一连串的外发消息。它们可以任意间隔发生。如果在发送最后一条消息后的一段时间内没有任何消息,我想发出一条新消息,该消息充当保持活动状态或心跳信号。

marbles diagram

这是我尝试过的代码示例。假设我想在“ C”之后直到“ D”之后每1s发出一次心跳消息。

Flux.concat(
        Flux.just("A","B","C").delayElements(Duration.ofMillis(500)),Flux.just("D").delaySequence(Duration.ofSeconds(5))
    )
    .windowTimeout(1,Duration.ofSeconds(1))
    .flatMap(window -> window.switchIfEmpty(Mono.just("*")))
    .log()
    .blockLast();

这是输出

14:30:14.659 [parallel-2] INFO reactor.Flux.FlatMap.1 - onNext(A)
14:30:15.162 [parallel-3] INFO reactor.Flux.FlatMap.1 - onNext(B)
14:30:15.663 [parallel-4] INFO reactor.Flux.FlatMap.1 - onNext(C)
14:30:16.664 [parallel-1] INFO reactor.Flux.FlatMap.1 - onNext(*)
14:30:17.665 [parallel-1] INFO reactor.Flux.FlatMap.1 - onNext(*)
14:30:18.664 [parallel-1] INFO reactor.Flux.FlatMap.1 - onNext(*)
14:30:19.670 [parallel-1] INFO reactor.Flux.FlatMap.1 - onNext(*)
14:30:20.665 [parallel-1] INFO reactor.Flux.FlatMap.1 - onNext(*)
14:30:20.676 [parallel-1] INFO reactor.Flux.FlatMap.1 - onNext(D)
14:30:20.677 [parallel-1] INFO reactor.Flux.FlatMap.1 - onNext(*)    // Why?
14:30:20.679 [parallel-1] INFO reactor.Flux.FlatMap.1 - onComplete()

在此示例中,即使我指定了5秒,D也会跟随C 5.013秒,因此,如果在中间发出4或5个项目/心跳,我就不会感到烦恼。不需要那么精确。

但是为什么在D之后省略另一个项目?有办法解决吗?也许我使用了错误的操作。

我想我可以使用处理器来实现它,但是the documentation says

大多数时候,您应该避免使用处理器。

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。