微信公众号搜"智元新知"关注
微信扫一扫可直接关注哦!

反序列化错误并记录分区、主题和偏移量

如何解决反序列化错误并记录分区、主题和偏移量

我正在使用发送到我的 DefaultKafkaConsumerFactory 上的 ErrorHandlingDeserialiser 处理反序列化错误

我有自定义代码

try (ErrorHandlingDeserializer<MyEvent> errorHandlingDeserializer = new ErrorHandlingDeserializer<>(theRealDeserialiser)) {
            errorHandlingDeserializer.setFailedDeserializationFunction(myCustomFunction::apply);
            return new DefaultKafkaConsumerFactory<>(getConsumerProperties(),consumerKeyDeserializer,errorHandlingDeserializer);
        }

我的自定义函数进行一些处理并发布到毒丸主题并返回null

当发生反序列化错误时,我想记录主题、分区和偏移量。我能想到的唯一方法是停止在函数中返回 null 并返回 MyEvent 的新子类型。我的 KafkaListener 然后可以询问新的子类型。

我有一个@KafkaListener 组件,它监听 Co​​nsumerRecord 如下:

 @KafkaListner(....)
 public void onMessage(ConsumerRecord<String,MyEvent> record) {
   ...
   ...
   
// if record.value instance of MynewsubType
//   I have access to the topic,partition and offset here,so I Could log it here
// I'd have to check that the instance of MyEvent is actually my sub type representing a Failed record.

 }

这是这样做的方式吗?我知道 null 对 Kafka 有特殊意义。

这种子类型方法的缺点是,我必须使用 ErrorHandlingDeserialiser 为每种类型创建一个子类型。

解决方法

不要使用函数;相反,抛出的 DeserializationException 直接传递给容器的 ErrorHandler

SeekToCurrentErrorHandler 认为这些异常是致命的并且不会重试它们,它将记录传递给恢复器。

有一个提供的 DeadLetterPublishingRecoverer 发送记录。

https://docs.spring.io/spring-kafka/docs/current/reference/html/#annotation-error-handling

https://docs.spring.io/spring-kafka/docs/current/reference/html/#dead-letters

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。