如何解决根据 Consumer.committablePartitionedSource 中分配的分区数调整并行度
我正在尝试使用 Consumer.committablePartitionedSource()
并为每个分区创建流,如下所示
public void setup() {
control = Consumer.committablePartitionedSource(consumerSettings,Subscriptions.topics("chat").withPartitionAssignmentHandler(new PartitionAssignmentListener()))
.mapAsyncUnordered(Integer.MAX_VALUE,pair -> setupsource(pair,committerSettings))
.toMat(Sink.ignore(),Consumer::createDrainingControl)
.run(Materializer.matFromSystem(actorSystem));
}
private CompletionStage<Done> setupsource(Pair<TopicPartition,Source<ConsumerMessage.CommittableMessage<String,String>,NotUsed>> pair,CommitterSettings committerSettings) {
LOGGER.info("SETTING UP PARTITION-{} SOURCE",pair.first().partition());
return pair.second().mapAsync(16,msg -> CompletableFuture.supplyAsync(() -> consumeMessage(msg),actorSystem.dispatcher())
.thenApply(param -> msg.committableOffset()))
.withAttributes(ActorAttributes.supervisionStrategy(ex -> Supervision.restart()))
.runWith(Committer.sink(committerSettings),Materializer.matFromSystem(actorSystem));
}
在为每个分区设置源时,我使用了并行性,我想根据分配给节点的分区数进行更改。我可以在第一次将分区分配给节点时做到这一点。但是随着新节点加入集群,分配的分区将被撤销和分配。这次流不发出已经存在的分区(由于 kafka 协作重新平衡协议)以重新配置并行性。
在这里,我在所有源中共享同一个调度程序,如果我在重新平衡时保持相同的并行性,我觉得每个分区消息处理的公平机会是不可能的。我对么?请指正
解决方法
如果我理解正确,您希望在 Kafka 重新平衡主题分区时动态变化的 Source
数量具有固定的并行性。
查看 Alpakka Kafka 文档 here 中的第一个示例。可以像这样根据您的示例进行调整:
Consumer.DrainingControl<Done> control =
Consumer.committablePartitionedSource(consumerSettings,Subscriptions.topics("chat"))
.wireTap(p -> LOGGER.info("SETTING UP PARTITION-{} SOURCE",p.first().partition()))
.flatMapMerge(Integer.MAX_VALUE,Pair::second)
.mapAsync(
16,msg -> CompletableFuture
.supplyAsync(() -> consumeMessage(msg),actorSystem.dispatcher())
.thenApply(param -> msg.committableOffset()))
.withAttributes(
ActorAttributes.supervisionStrategy(
ex -> Supervision.restart()))
.toMat(Committer.sink(committerSettings),Consumer::createDrainingControl)
.run(Materializer.matFromSystem(actorSystem));
因此,基本上 Consumer.committablePartitionedSource()
将在 Kafka 将分区分配给此消费者时发出 Source
,并在先前分配的分区重新平衡并从此消费者中移除时终止此类 Source
。>
flatMapMerge
将获取这些 Source
并合并它们输出的消息。
所有这些消息都将在 mapAsync
阶段竞争以得到处理。这种竞争的公平性实际上取决于上面的 flatMapMerge
,它应该为所有 Source
提供平等的机会来发出它们的消息。无论有多少 Source
输出消息,它们都将在这里共享固定的并行性,我相信这就是您所追求的。
所有这些消息最终都会到达处理偏移提交的 Commiter.sink
。
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。