如何解决Kafka 侦听器的请求/回复和重试策略
我目前正在为 Kafka 侦听器实现错误处理机制。这就是我想要实现的:当 Kafka Listener 中发生异常时,我想重试处理我的记录 10 次,如果仍然失败,将其发送到死信队列并在有请求时通知发件人/回复场景(例如,返回特定的 RecordFailure 对象)。 我首先定义了一个 ErrorHandler bean,它运行良好,但没有提供在失败时返回值的可能性。然后我开始定义一个 KafkaListenerErrorHandler bean 并将其指定为我的 Kafka Listener 的参数。它允许我返回一个特定的值,但我丢失了 ErrorHandler 定义的重试和死信队列转发策略。最后,我使用 RetryTemplate 和 RecoveryCallback 配置我的容器工厂。我认为它会按预期工作但是当使用 ReplyingKafkaTemplate 发送我的消息时,我总是最终收到超时异常并意识到,因为我定义了一个 RetryTemplate,RetryingMessageListenerAdapter 中的 onMessage() 方法被调用而不是 ReplyingKafkaTemplate 一个.我现在质疑场景本身:将 RetryTemplate、RecoveryCallback 和 ReplyingKafkaTemplate 结合起来使用 Kafka Listener 启用请求/回复与重试策略是否有意义?如果是这样,我在这里错过了什么?
感谢您抽出宝贵时间。
解决方法
由于错误处理程序支持退避和重试,因此在侦听器级别使用重试模板大多是多余的。
解决您的特定问题的一种方法是启用 deliveryAttemptHeader
。
https://docs.spring.io/spring-kafka/docs/current/reference/html/#delivery-header
然后,在您的侦听器错误处理程序中,检查标题,当达到特定尝试次数时,将消息发布到死信主题并返回错误结果。在达到计数之前重新抛出异常,以便 SeekToCurrentErrorHandler
重新传递记录。
只需确保 STCEH 有足够的重试次数,以便它始终重试,从而使侦听器错误处理程序能够完成其工作。
编辑
这是一个示例,展示了如何通过在标头中添加原始 ConsumerRecord
来使用侦听器错误处理程序中的 DLPR...
@SpringBootApplication
public class So66982480Application {
public static void main(String[] args) {
SpringApplication.run(So66982480Application.class,args);
}
@Bean
ReplyingKafkaTemplate<String,String,String> rkt(ProducerFactory<String,String> pf,ConcurrentKafkaListenerContainerFactory<String,String> factory,KafkaTemplate<String,String> template) {
factory.getContainerProperties().setDeliveryAttemptHeader(true);
factory.setReplyTemplate(template);
ConcurrentMessageListenerContainer<String,String> container = factory.createContainer("so66982480-replies");
container.getContainerProperties().setGroupId("so66982480-replies");
return new ReplyingKafkaTemplate<>(pf,container);
}
@Bean
RecordMessageConverter converter() {
MessagingMessageConverter converter = new MessagingMessageConverter() {
@Override
public Message<?> toMessage(ConsumerRecord<?,?> record,Acknowledgment acknowledgment,Consumer<?,?> consumer,Type type) {
Message<?> message = super.toMessage(record,acknowledgment,consumer,type);
return MessageBuilder.fromMessage(message)
.setHeader(KafkaHeaders.RAW_DATA,record)
.build();
}
};
return converter;
}
@Bean
KafkaTemplate<String,String> template(ProducerFactory<String,String> pf) {
return new KafkaTemplate<>(pf);
}
@Bean
NewTopic topic1() {
return TopicBuilder.name("so66982480").partitions(1).replicas(1).build();
}
@Bean
NewTopic topic2() {
return TopicBuilder.name("so66982480-replies").partitions(1).replicas(1).build();
}
@Bean
NewTopic topic3() {
return TopicBuilder.name("so66982480.DLT").partitions(1).replicas(1).build();
}
@Bean
KafkaListenerErrorHandler eh(DeadLetterPublishingRecoverer recoverer) {
return (msg,ex) -> {
if (msg.getHeaders().get(KafkaHeaders.DELIVERY_ATTEMPT,Integer.class) > 9) {
recoverer.accept(msg.getHeaders().get(KafkaHeaders.RAW_DATA,ConsumerRecord.class),ex);
return "FAILED";
}
throw ex;
};
}
@Bean
DeadLetterPublishingRecoverer recoverer(KafkaOperations<String,String> template) {
return new DeadLetterPublishingRecoverer(template);
}
@KafkaListener(id = "so66982480",topics = "so66982480",errorHandler = "eh")
@SendTo
public String listen(String in) {
throw new RuntimeException("test");
}
@KafkaListener(id = "so66982480.DLT",topics = "so66982480.DLT")
public void dlt(String in) {
System.out.println("From DLT:" + in);
}
@Bean
public ApplicationRunner runner(ReplyingKafkaTemplate<String,String> template) {
return args -> {
RequestReplyFuture<String,String> future =
template.sendAndReceive(new ProducerRecord<String,String>("so66982480",null,"test"),Duration.ofSeconds(30));
System.out.println(future.getSendFuture().get(10,TimeUnit.SECONDS).getRecordMetadata());
System.out.println(future.get(30,TimeUnit.SECONDS).value());
};
}
}
spring.kafka.consumer.auto-offset-reset=earliest
From DLT:test
FAILED
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。