如何解决Spring @StreamListener:指数退避的无限重试
我正在尝试将我的使用者配置为使用指数退避,其中消息将被处理固定次数的尝试,并在其中应用退避期。但我没有得到预期的行为。
这是我的 Java 代码:
@EnableBinding({
MessagingConfiguration.EventTopic.class
})
public class MessagingConfiguration {
public interface EventTopic {
String INPUT = "events-channel";
@Input(INPUT)
@Nonnull
SubscribableChannel input();
}
}
@StreamListener(MessagingConfiguration.EventTopic.INPUT))
void handle(@Nonnull Message<Event> event) {
throw new RuntimeException("FAILING!");
}
如果我尝试下一个配置:
spring.cloud.stream:
bindings:
events-channel:
content-type: application/json
destination: event-develop
group: group-event-service
consumer:
max-attempts: 2
在所有重试 (20*) 后,我收到此消息:
Backoff FixedBackOff{interval=0,currentAttempts=10,maxAttempts=9} exhausted for ConsumerRecord(...
2 (consumer.max-attempts
) * 10 (FixedBackOff.currentAttempts
) = 20* 重试
所有这些重试都有 1 秒的延迟(默认退避周期)
如果我将配置更改为:
spring.cloud.stream:
bindings:
events-channel:
content-type: application/json
destination: event-develop
group: group-event-service
consumer:
max-attempts: 8
#Times in milliseconds
back-off-initial-interval: 1000
back-off-max-interval: 60000
back-off-multiplier: 2
在 8 次重试 (max-attempts
) 期间很好地应用了退避期,但是当 8 次重试完成时,一个新的重试周期开始无限期地阻塞主题。
在下一个版本中,也许我会实现一个更复杂的错误处理系统,但现在我只需要在重试后丢弃消息并获取下一个。
我做错了什么?
我在这里阅读了很多问题/答案、官方文档和互联网上的一些教程,但我没有找到避免无限重试的解决方案。
P.S.:我正在与 spring-cloud-stream (3.1.1)
和 spring-kafka (2.6.6)
解决方法
这是因为监听器容器现在默认配置为 SeekToCurrentErrorHandler
,尝试次数为 10 次。
这意味着您正在复合重试。
您可以使用 SeekToCurrentErrorHandler
ListenerContainerCustomizer
注入适当配置的 @Bean
。
建议不要在两个地方都配置重试;要么删除绑定配置并用适当配置的错误处理程序替换它,要么将错误处理程序更改为不重试。
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。