如何解决如果偏移量在 Kafka 中被破坏会发生什么?
我们如何处理偏移损坏?
我想将偏移量日志保存在其他地方,或者拍摄偏移量的快照。我该怎么做?
解决方法
Kafka 在名为 _consumer_offsets 的主题中存储偏移量。消费者将偏移量提交到该主题中,auto.offset.reset 的值(earliest/latest/none)决定了从分区开始读取消息的策略。偏移日志保留时间由代理属性指定。
auto.offset.reset = latest
=> 将从上次提交的偏移量开始读取消息,如果未找到,则它将等待新消息到达并从那里开始。不抛出异常
auto.offset.reset = earliest
=> 同样不会抛出任何异常,如果存在偏移,它将从头开始读取消息。
auto.offset.reset = none
=> 找不到偏移量时会抛出异常。
您可以使用assign和seek获取特定数据
//assign - set topic and partition you you want to read from using TopicPartion
TopicPartition topicPartitionToReadFrom = new
TopicPartition(topic,0);
long offsetToReadFrom = 15L;
consumer.assign(Arrays.asList(topicPartitionToReadFrom));
//seek - set position of the consumer manually by calling
//KafkaConsumer.seek(TopicPartition partition,long offset)
consumer.seek(topicPartitionToReadFrom,offsetToReadFrom);
存储偏移日志 => _consumer_offsets 是主题,因此您可以write a consumer for this topic 并将消息存储到您选择的存储中。
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。