如何解决Kafka 流消息被多次处理
我目前正在开发一组微服务,这些服务通过使用 Kafka,更具体地说是流进行通信。
在大多数情况下,在开发环境中,一切似乎都运行良好,没有任何问题,但是,在临时环境中,我遇到了我无法理解为什么会发生的行为。
在某些情况下,应用程序多次处理通过流消费者接收的单个(或多个)消息。
似乎在消息的实际处理(即应用程序逻辑)需要一些时间完成时就会发生这种情况——考虑到它涉及 I/O 和其他繁重的操作,可能会这样做。
我对 Kafka 相当陌生,但据我所知,这与我的消费者没有足够快地提交偏移量有关,这反过来又没有将其标记为正在处理。我试图找出可能会缓解此问题的配置设置,但坦率地说,我不明白需要做什么。
作为参考,这是我的Stream的配置:
Properties props = builder.getConfiguration();
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,"earliest");
props.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG,StreamsConfig.EXACTLY_ONCE);
props.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG,0);
该应用程序使用 Micronaut 和 Java 开发,分别为最新版本和版本 11。
如果有人可以建议解决此问题的方法,那将非常有帮助。
解决方法
afaik 你应该看看:
session.timeout.ms
在使用 Kafka 的组管理工具时用于检测客户端故障的超时。客户端定期发送心跳以向代理指示其活跃度。如果在此会话超时到期之前代理没有收到心跳,则代理将从组中删除此客户端并启动重新平衡。请注意,该值必须在由 group.min.session.timeout.ms 和 group.max.session.timeout.ms 在代理配置中配置的允许范围内。
request.timeout.ms
配置控制客户端等待请求响应的最长时间。如果在超时过去之前没有收到响应,客户端将在必要时重新发送请求,或者如果重试用尽则请求失败。
见https://docs.confluent.io/platform/current/installation/configuration/consumer-configs.html
如果处理后不需要提交,也可以考虑异步处理,但如果处理失败则没有自动重新处理消息的选项
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。