微信公众号搜"智元新知"关注
微信扫一扫可直接关注哦!

卡夫卡偏移量增量,即使未达到Ack

如何解决卡夫卡偏移量增量,即使未达到Ack

我有一个消费者,它消费一条消息,先做一些繁重的工作,然后再屈服。

    @KafkaListener(topics = "${kafka.topic}",groupId = "group",containerFactory ="ContainerFactory")
  public void consumeMessage(@Payload Payload message,@Headers MessageHeaders headers,AckNowledgment ack) {

try {
  //Heavy Job
  ack.ackNowledge();
} catch (Exception e) {
  log("Error in Kafka Consumer);
    }
  }

现在,如果存在异常,则应转到catch块,并且不应发生确认;如果未发生确认,则应返回队列并再次进行处理。但这没有发生。偏移量更新,并选择下一条消息。 我了解到,消费者有一个民意调查大小,它可以一次选择多个消息。但是,即使未确认一条消息,也应重新处理它,而不是忽略它并更新偏移量。

这是Kafka消费者配置

`Map<String,Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONfig,bootstrapServers);
props.put(ConsumerConfig.GROUP_ID_CONfig,consumerGroup);
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONfig,5);
props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONfig,10000);
props.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONfig,1000);
props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONfig,20000);
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONfig,"earliest");
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONfig,false);

解决方法

这是基础KafkaConsumer的预期行为。

在幕后,KafkaConsumer使用JavaDocs中描述的poll API:

“在每次轮询中,使用者将尝试使用上次消耗的偏移量作为起始偏移量并按顺序获取。上次消耗的偏移量可以通过seek(TopicPartition,long)手动设置,也可以自动设置为上一次提交的偏移量订阅的分区列表。”

这意味着,它不会检查最后一个提交的偏移量,而是检查最后一个已消耗的偏移量,然后按顺序获取数据。仅当重新开始工作时,它才会继续从该使用者组的最后提交的偏移量中读取数据,或者如果您基于auto_offset_reset配置使用新的使用者组。

为解决您的问题,我在catch块中看到了以下解决方案,您可以将其应用:

  • 不仅可以记录“ Kafka Consumer中的错误”,还可以关闭工作。修改代码并重新启动您的应用程序
  • 使用偏移量编号(导致异常)使用seek API将您的使用者重新定位到相同的偏移量。可以在here
  • 中找到有关seek方法的详细信息

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。