如何解决将助焊剂分为单声道头部和尾部
我想将Flux
分为两部分:第一个元素的Mono
(头部),其他所有元素的Flux
(尾部)。
在此过程中,不应重新订阅基本Flux
。
不起作用的示例:
final Flux<Integer> baseFlux = Flux.range(0,3).log();
final Mono<Integer> head = baseFlux.next();
final Flux<Integer> tail = baseFlux.skip(1L);
assertthat(head.block()).isEqualTo(0);
assertthat(tail.collectList().block()).isEqualTo(Arrays.asList(1,2));
有关此日志,如下所示,您将看到基本Flux将被重新订阅两次:
[main] DEBUG reactor.util.Loggers$LoggerFactory - Using Slf4j logging framework
[main] INFO reactor.Flux.Range.1 - | onSubscribe([Synchronous Fuseable] Fluxrange.RangeSubscription)
[main] INFO reactor.Flux.Range.1 - | request(unbounded)
[main] INFO reactor.Flux.Range.1 - | onNext(0)
[main] INFO reactor.Flux.Range.1 - | cancel()
[main] INFO reactor.Flux.Range.1 - | onSubscribe([Synchronous Fuseable] Fluxrange.RangeSubscription)
[main] INFO reactor.Flux.Range.1 - | request(unbounded)
[main] INFO reactor.Flux.Range.1 - | onNext(0)
[main] INFO reactor.Flux.Range.1 - | onNext(1)
[main] INFO reactor.Flux.Range.1 - | onNext(2)
[main] INFO reactor.Flux.Range.1 - | onComplete()
[main] INFO reactor.Flux.Range.1 - | request(1)
我的实际情况是我的基本Flux
包含CSV文件的行,第一行是文件的标题,解析所有后续行都需要。基本Flux
仅基于InputStream
我为此找到的唯一相关资源是this question,但是我发现这有点不符合我的需求。
解决方法
由于comments中的建议,我得以设计出以下解决方案:
final Flux<Integer> baseFlux = Flux.range(0,3).log();
final Flux<? extends Tuple2<? extends Integer,Integer>> zipped = baseFlux
.switchOnFirst((signal,flux) -> (signal.hasValue()
? Flux.zip(Flux.just(signal.get()).repeat(),flux.skip(1L))
: Flux.empty()));
final List<? extends Tuple2<? extends Integer,Integer>> list = zipped.collectList().block();
assertThat(list.stream().map(Tuple2::getT1)).isEqualTo(Arrays.asList(0,0));
assertThat(list.stream().map(Tuple2::getT2)).isEqualTo(Arrays.asList(1,2));
它将第一个元素之后的基本Flux
转换为原始元素的尾部,重复压缩该元素。并且它只订阅一次baseFlux
。
我不确定这是最好的解决方案,因为与有状态(“热”)通量的解决方案相比,它将创建很多Tuple2
对象,这些对象最终将进行GC处理基于baseFlux
,可以保持原始订阅有效。
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。