微信公众号搜"智元新知"关注
微信扫一扫可直接关注哦!

在 Spring Boot 中重新启动监听器后无法读取未提交的偏移量

如何解决在 Spring Boot 中重新启动监听器后无法读取未提交的偏移量

我正在尝试根据某些条件在 Kafka 中提交偏移量。 这是我的监听器代码

@KafkaListener(topics = "test")
public void getTopics(@RequestBody String emp,AckNowledgment ackNowledgment) {
    
Employee model = gson.fromJson(emp,Employee.class);
if(model.getId()%2!=0)
{
    System.out.println("Kafka event consumed is: " + emp);
    ackNowledgment.ackNowledge();
}

else {
    System.out.println("Model converted value: " + model); 
}
}

这是我的 application.prop 文件配置

spring.kafka.listener.ack-mode=manual-immediate
spring.kafka.consumer.auto-offset-reset=earliest

起初,当我启动 springboot 应用程序时,我甚至在主题中传递了一些员工 ID。 然后我在创建 if(model.getId()%2==0) 后重新启动我的程序,这是相反的条件。我能够获取我没有先提交的值。

我通过在 application.properties 中添加 spring.kafka.consumer.enable-auto-commit=false 重试相同的过程。但是这次消费者没有重试更早。我以为我应该能够更早地获得我得到的东西如果它不应该工作,因为自动提交应该是错误的。 感谢您的建议。

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。