如何解决Kafka Streams / 如何获取迭代器正在迭代的分区?
在我的 Kafka Streams 应用程序中,我有一个任务来设置一个预定的(按墙上时间)标点符号。标点符号遍历商店的条目并对其进行处理。像这样:
var store = context().getStateStore("MyStore");
var iter = store.all();
while (iter.hasNext()) {
var entry = iter.next();
// ... do something with the entry
}
// Print a summary (Now): N entries processed
// Print a summary (wish): N entries processed in partition P
由于我在这里处理的是单个存储(可能已分区),因此我假设标点符号的每次执行都绑定到该存储的单个分区。
是否可以找出标点符号作用于哪个分区? ProcessorContext.partition()
的 Java 文档指出此方法在标点符号内返回 -1
。
我已经阅读了 Kafka Streams: Punctuate vs Process 和那里的答案。我可以理解,任务通常与特定分区无关。但是迭代器应该绑定到 IMO。
如何找到分区?
或者我的假设是,存储迭代器的特定实例与分区相关联是错误的吗?
我需要它做什么:我想在一些日志消息中包含分区号。现在,我有几条几乎相同的日志消息,说明标点符号会这样做和那样做。为了使这些消息“独一无二”,我想在其中包含分区号。
解决方法
只是在这里发布https://issues.apache.org/jira/browse/KAFKA-12328中提供的答案:
我刚刚使用了 context.taskId()
。它在值的末尾、下划线之后包含分区号。这对我来说已经足够了。
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。