微信公众号搜"智元新知"关注
微信扫一扫可直接关注哦!

Producer 有多个实例时会创建多个ProducerIds

如何解决Producer 有多个实例时会创建多个ProducerIds

在.yaml文件中,我们已经设置了

spring.cloud.stream.kafka.binder.configuration.enable.idempotence as true.

现在当应用程序启动时,我们可以看到像

这样的日志
[kafka-producer-network-thread | test_clientId] org.apache.kafka.clients.producer.internals.TransactionManager - [Producer clientId=test_clientId] ProducerId set to 0 with epoch 0

当第一条消息被生产到主题时,我们可以看到另一个 ProducerId 正在被使用,如下面的日志所示

[Ljava.lang.String;@720a86ef.container-0-C-1] org.apache.kafka.clients.producer.KafkaProducer - [Producer clientId=test_clientId] Instantiated an idempotent producer.
[Ljava.lang.String;@720a86ef.container-0-C-1] org.apache.kafka.common.utils.AppInfoParser - Kafka version : 2.0.1
[Ljava.lang.String;@720a86ef.container-0-C-1] org.apache.kafka.common.utils.AppInfoParser - Kafka commitId : fa14705e51bd2ce5
kafka-producer-network-thread | test_clientId] org.apache.kafka.clients.Metadata - Cluster ID: -9nblycHSsiksLIUbVH6Vw
-9nblycHSsiksLIUbVH6Vw
1512361 INFO [kafka-producer-network-thread | test_clientId] org.apache.kafka.clients.producer.internals.TransactionManager - [Producer clientId=test_clientId] ProducerId set to 1 with epoch 0

一旦 ProducerId 设置为 1,当从该应用程序发送任何新消息时,不会创建新的 ProducerId。

但是如果我们有多个应用程序在运行(都连接到同一个 kafka 服务器), 然后在该实例中创建新的 ProducerIds,同时也在启动和发送第一条消息时创建。

请建议我们是否可以限制创建新的 ProducerIds 并使用在创建应用程序时创建的相同的 ProducerIds。 另外,由于创建了很多ProducerIds,有什么方法可以重用已经创建的ProducerIds吗?(假设应用有多个ProducerIds,每个ProducerIds创建多个ProducerIds)

解决方法

第一个生产者是临时的 - 创建它是为了在初始化期间查找主题的现有分区。它立即关闭。

第二个生产者是用于后续记录发送的单个生产者。

producerId 和 epoch 由 broker 分配。它们必须是独一无二的。

使用新代理,您将获得第一个实例的 0 和 1,第二个实例的 2 和 3,4 和 5,...

即使停止所有实例,下一个也会得到 7 和 8。

你为什么要担心这个?

另一方面,如果您将 client.id 设置为,例如 foo,您将始终在所有实例上获得 foo-1foo-2

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。

相关推荐


Selenium Web驱动程序和Java。元素在(x,y)点处不可单击。其他元素将获得点击?
Python-如何使用点“。” 访问字典成员?
Java 字符串是不可变的。到底是什么意思?
Java中的“ final”关键字如何工作?(我仍然可以修改对象。)
“loop:”在Java代码中。这是什么,为什么要编译?
java.lang.ClassNotFoundException:sun.jdbc.odbc.JdbcOdbcDriver发生异常。为什么?
这是用Java进行XML解析的最佳库。
Java的PriorityQueue的内置迭代器不会以任何特定顺序遍历数据结构。为什么?
如何在Java中聆听按键时移动图像。
Java“Program to an interface”。这是什么意思?