微信公众号搜"智元新知"关注
微信扫一扫可直接关注哦!

根据 Consumer.committablePartitionedSource 中分配的分区数调整并行度

如何解决根据 Consumer.committablePartitionedSource 中分配的分区数调整并行度

我正在尝试使用 Consumer.committablePartitionedSource() 并为每个分区创建流,如下所示

    public void setup() {
        control = Consumer.committablePartitionedSource(consumerSettings,Subscriptions.topics("chat").withPartitionAssignmentHandler(new PartitionAssignmentListener()))
                .mapAsyncUnordered(Integer.MAX_VALUE,pair -> setupsource(pair,committerSettings))
                .toMat(Sink.ignore(),Consumer::createDrainingControl)
                .run(Materializer.matFromSystem(actorSystem));
    }

    private CompletionStage<Done> setupsource(Pair<TopicPartition,Source<ConsumerMessage.CommittableMessage<String,String>,NotUsed>> pair,CommitterSettings committerSettings) {
        LOGGER.info("SETTING UP PARTITION-{} SOURCE",pair.first().partition());
        return pair.second().mapAsync(16,msg -> CompletableFuture.supplyAsync(() -> consumeMessage(msg),actorSystem.dispatcher())
                .thenApply(param -> msg.committableOffset()))
                .withAttributes(ActorAttributes.supervisionStrategy(ex -> Supervision.restart()))
                .runWith(Committer.sink(committerSettings),Materializer.matFromSystem(actorSystem));
    }

在为每个分区设置源时,我使用了并行性,我想根据分配给节点的分区数进行更改。我可以在第一次将分区分配给节点时做到这一点。但是随着新节点加入集群,分配的分区将被撤销和分配。这次流不发出已经存在的分区(由于 kafka 协作重新平衡协议)以重新配置并行性。

在这里,我在所有源中共享同一个调度程序,如果我在重新平衡时保持相同的并行性,我觉得每个分区消息处理的公平机会是不可能的。我对么?请指正

解决方法

如果我理解正确,您希望在 Kafka 重新平衡主题分区时动态变化的 Source 数量具有固定的并行性。

查看 Alpakka Kafka 文档 here 中的第一个示例。可以像这样根据您的示例进行调整:

 Consumer.DrainingControl<Done> control =
      Consumer.committablePartitionedSource(consumerSettings,Subscriptions.topics("chat"))
              .wireTap(p -> LOGGER.info("SETTING UP PARTITION-{} SOURCE",p.first().partition()))
              .flatMapMerge(Integer.MAX_VALUE,Pair::second)
              .mapAsync(
                16,msg -> CompletableFuture
                         .supplyAsync(() -> consumeMessage(msg),actorSystem.dispatcher())
                         .thenApply(param -> msg.committableOffset()))
              .withAttributes(
                ActorAttributes.supervisionStrategy(
                  ex -> Supervision.restart()))
              .toMat(Committer.sink(committerSettings),Consumer::createDrainingControl)
              .run(Materializer.matFromSystem(actorSystem));

因此,基本上 Consumer.committablePartitionedSource() 将在 Kafka 将分区分配给此消费者时发出 Source,并在先前分配的分区重新平衡并从此消费者中移除时终止此类 Source。>

flatMapMerge 将获取这些 Source 并合并它们输出的消息。

所有这些消息都将在 mapAsync 阶段竞争以得到处理。这种竞争的公平性实际上取决于上面的 flatMapMerge,它应该为所有 Source 提供平等的机会来发出它们的消息。无论有多少 Source 输出消息,它们都将在这里共享固定的并行性,我相信这就是您所追求的。

所有这些消息最终都会到达处理偏移提交的 Commiter.sink

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。

相关推荐


Selenium Web驱动程序和Java。元素在(x,y)点处不可单击。其他元素将获得点击?
Python-如何使用点“。” 访问字典成员?
Java 字符串是不可变的。到底是什么意思?
Java中的“ final”关键字如何工作?(我仍然可以修改对象。)
“loop:”在Java代码中。这是什么,为什么要编译?
java.lang.ClassNotFoundException:sun.jdbc.odbc.JdbcOdbcDriver发生异常。为什么?
这是用Java进行XML解析的最佳库。
Java的PriorityQueue的内置迭代器不会以任何特定顺序遍历数据结构。为什么?
如何在Java中聆听按键时移动图像。
Java“Program to an interface”。这是什么意思?