微信公众号搜"智元新知"关注
微信扫一扫可直接关注哦!

Flink 偏移量在手动重置 kafka 偏移量时进入不一致状态

如何解决Flink 偏移量在手动重置 kafka 偏移量时进入不一致状态

我们有一个从 kafka 读取 msg 的 flink 流应用程序。 由于某种原因,我们不得不从 kafka reset 命令将 kafka 偏移重置为最新,因为有大量堆积。我们希望 flink 应用程序跳过所有这些消息,并从重置后的新消息开始。

问题是因为 flink 在内部管理它的偏移量,它不知道这个重置,它现在只从后向读取 msg(重置前的偏移点),现在也不能提交偏移量。因此,每次重新启动 flink 应用程序时,它都会再次从同一点读取。所以我们在每次重启时都有重复的 msg。

我知道我们不应该在 flink kafka 应用程序中手动重置偏移量。但我们如何从中恢复。

我已尝试将 auto.offset.config 设置为最新,但它仍然会再次读取这些消息。

解决方法

只有当 Flink 从故障中恢复或从保存点或检查点手动重启时,它才会使用记录在检查点或保存点中的偏移量。

否则,Flink Kafka 消费者将从消费者组在 Kafka 代理中提交的偏移量或您在代码中明确指定的偏移量开始读取,即,

myConsumer.setStartFromEarliest();     // start from the earliest record possible
myConsumer.setStartFromLatest();       // start from the latest record
myConsumer.setStartFromTimestamp(...); // start from specified epoch timestamp (msecs)
myConsumer.setStartFromGroupOffsets(); // the default behaviour

我不知道如何将这些事实与您报告的内容相协调。

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。