如何解决如何动态/在启动时创建Kafka主题,以供生产者发送?
我开始将Confluent .NET库用于Kafka,并尝试实现一种用于Azure Service Bus的模式,以在生产方应用程序启动时创建主题(如果不存在则创建)。该如何在Kafka API中完成,并且可以完成?
这将使主题成为源代码控制的一部分,并在自动发布过程中进行配置,而不是按主题/环境手动设置。另外,我更希望开发人员不必转到每个Kafka实例/环境并先对其进行配置以匹配它们。
如果我不能这样做,则必须在发布过程中将其烘烤到bash脚本中,但希望在启动代码中使用它。
解决方法
您可以启用群集范围的配置auto.create.topics.enable。
如果新的生产者尝试将数据发送到尚不存在的主题,这将自动创建主题。
但是,请注意以下几点:
- 将使用复制,分区数和保留期的默认设置创建主题。确保根据需要更改这些默认设置。无论如何,所有自动创建的主题都将具有相同的配置。 生产者代码中主题名称配置中的
- typos可能导致不必要的主题创建。
或者,您可以使用AdminClient API。 here显示了一个示例:
static async Task CreateTopicAsync(string bootstrapServers,string topicName) { using (var adminClient = new AdminClientBuilder(new AdminClientConfig { BootstrapServers = bootstrapServers }).Build()) { try { await adminClient.CreateTopicsAsync(new TopicSpecification[] { new TopicSpecification { Name = topicName,ReplicationFactor = 1,NumPartitions = 1 } }); } catch (CreateTopicsException e) { Console.WriteLine($"An error occured creating topic {e.Results[0].Topic}: {e.Results[0].Error.Reason}"); } } }
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。