如何解决ParallelFlux<T> Reactor 3 的 .publish(Function<T, R>) 等价物
我有一些像这样的 Reactor Flux:
SomeFlux.map(thing -> makeOtherThing(thing))
.publish(fluxFunction)
.subscribe();
其中 .publish()
的参数是 fluxFunction
,它是 Java 8 实现的对象Function<T,R>
功能界面如下:
public class FluxFunction implements Function<Flux<OtherThing>,Flux<DiffOtherThing>> {
@Override
Flux<DiffOtherThing> apply(Flux<OtherThing>) {
// some code which "consumes" incoming Flux and outputs another
}
}
我的问题是,如果我把上面的代码改成:
SomeFlux.parallel()
.runOn(Schedulers.parrallel())
.map(thing -> makeOtherThing(thing))
.publish(fluxFunction)
.subscribe();
.publish(fluxFunction) 方法无效,因为我现在正在处理 ParallelFlux,是否有一些等效的 .publish(Function) 可以在 parallelFlux 上使用?如果是这样,我将如何编辑我的 Function 实现以适应这一点?
解决方法
所以看起来等效的(至少在我的情况下)是:
SomeFlux.parallel()
.runOn(Schedulers.parallel())
.map(thing -> makeOtherThing(thing))
.as(fluxFunction) // changed ".publish()" to ".as()"
.subscribe()
我也将 fluxFunction
重新定义为:
// Now operates on ParallelFlux<> rather than Flux<>
public class FluxFunction implements Function<ParallelFlux<OtherThing>,ParallelFlux<DiffOtherThing>> {
@Override
ParallelFlux<DiffOtherThing> apply(ParallelFlux<OtherThing>) {
// some code which "consumes" incoming ParallelFlux and outputs another
}
}
这对我来说似乎很完美。
我希望这可以帮助任何发现自己处于类似困境的人。
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。