微信公众号搜"智元新知"关注
微信扫一扫可直接关注哦!

具有并行性的 Reactor GroupBy 在同一线程上运行

如何解决具有并行性的 Reactor GroupBy 在同一线程上运行

我正在尝试实现每个组的并行性,其中分组元素并行运行,并且在组内每个元素按顺序工作。但是对于下面的代码,第一个发射使用并行线程,但对于后续发射它使用一些不同的线程池。如何实现组内元素的组并行和顺序执行。

public class ReactorTest implements SmartLifecycle,ApplicationListener<ApplicationReadyEvent> {

    private AtomicInteger counter = new AtomicInteger(1);
    private Many<Integer> healthSink;
    private disposable dispose;

    private scheduledexecutorservice executor;

    @Override
    public void start() {
        executor = Executors.newSingleThreadScheduledExecutor();
        healthSink = Sinks.many().unicast().onBackpressureBuffer();
        dispose = healthSink.asFlux().groupBy(v -> v % 3 == 0).parallel(10)
                .runOn(Schedulers.newBoundedElastic(10,100,"k-task")).log().flatMap(v -> v)
                .subscribe(v -> log.info("Data {}",v));
    }

    @Override
    public void stop() {
        executor.shutdownNow();
        if (dispose != null) {
            dispose.dispose();
        }
    }

    @Override
    public boolean isRunning() {
        return executor == null ? false : !executor.isShutdown();
    }

    @Override
    public void onApplicationEvent(ApplicationReadyEvent event) {

        executor.scheduleAtFixedrate(() -> {
            healthSink.tryEmitNext(counter.incrementAndGet());
            healthSink.tryEmitNext(counter.incrementAndGet());
            healthSink.tryEmitNext(counter.incrementAndGet());
        },10,TimeUnit.SECONDS);
    }
}

日志

2021-07-27 14:15:34.189  INFO 22212 --- [  restartedMain] i.g.kprasad99.reactor.DemoApplication    : Started DemoApplication in 1.464 seconds (JVM running for 1.795)
2021-07-27 14:15:44.206  INFO 22212 --- [       k-task-1] reactor.Parallel.RunOn.1                 : onNext(UnicastGroupedFlux)
2021-07-27 14:15:44.207  INFO 22212 --- [       k-task-2] reactor.Parallel.RunOn.1                 : onNext(UnicastGroupedFlux)
2021-07-27 14:15:44.207  INFO 22212 --- [       k-task-1] io.github.kprasad99.reactor.ReactorTest  : Data 2
2021-07-27 14:15:44.207  INFO 22212 --- [       k-task-2] io.github.kprasad99.reactor.ReactorTest  : Data 3
2021-07-27 14:15:44.207  INFO 22212 --- [       k-task-1] io.github.kprasad99.reactor.ReactorTest  : Data 4
2021-07-27 14:15:54.200  INFO 22212 --- [pool-3-thread-1] io.github.kprasad99.reactor.ReactorTest  : Data 5
2021-07-27 14:15:54.200  INFO 22212 --- [pool-3-thread-1] io.github.kprasad99.reactor.ReactorTest  : Data 6
2021-07-27 14:15:54.200  INFO 22212 --- [pool-3-thread-1] io.github.kprasad99.reactor.ReactorTest  : Data 7
2021-07-27 14:16:04.195  INFO 22212 --- [pool-3-thread-1] io.github.kprasad99.reactor.ReactorTest  : Data 8
2021-07-27 14:16:04.195  INFO 22212 --- [pool-3-thread-1] io.github.kprasad99.reactor.ReactorTest  : Data 9
2021-07-27 14:16:04.195  INFO 22212 --- [pool-3-thread-1] io.github.kprasad99.reactor.ReactorTest  : Data 10
2021-07-27 14:16:14.206  INFO 22212 --- [pool-3-thread-1] io.github.kprasad99.reactor.ReactorTest  : Data 11
2021-07-27 14:16:14.206  INFO 22212 --- [pool-3-thread-1] io.github.kprasad99.reactor.ReactorTest  : Data 12
2021-07-27 14:16:14.206  INFO 22212 --- [pool-3-thread-1] io.github.kprasad99.reactor.ReactorTest  : Data 13
2021-07-27 14:16:24.197  INFO 22212 --- [pool-3-thread-1] io.github.kprasad99.reactor.ReactorTest  : Data 14
2021-07-27 14:16:24.197  INFO 22212 --- [pool-3-thread-1] io.github.kprasad99.reactor.ReactorTest  : Data 15
2021-07-27 14:16:24.197  INFO 22212 --- [pool-3-thread-1] io.github.kprasad99.reactor.ReactorTest  : Data 16
2021-07-27 14:16:34.196  INFO 22212 --- [pool-3-thread-1] io.github.kprasad99.reactor.ReactorTest  : Data 17
2021-07-27 14:16:34.196  INFO 22212 --- [pool-3-thread-1] io.github.kprasad99.reactor.ReactorTest  : Data 18
2021-07-27 14:16:34.196  INFO 22212 --- [pool-3-thread-1] io.github.kprasad99.reactor.ReactorTest  : Data 19
2021-07-27 14:16:44.201  INFO 22212 --- [pool-3-thread-1] io.github.kprasad99.reactor.ReactorTest  : Data 20
2021-07-27 14:16:44.201  INFO 22212 --- [pool-3-thread-1] io.github.kprasad99.reactor.ReactorTest  : Data 21
2021-07-27 14:16:44.201  INFO 22212 --- [pool-3-thread-1] io.github.kprasad99.reactor.ReactorTest  : Data 22
2021-07-27 14:16:54.201  INFO 22212 --- [pool-3-thread-1] io.github.kprasad99.reactor.ReactorTest  : Data 23

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。