如何解决批量KafkaListener的ConsumerInterceptor
我的消费者配置有 kafka 批量侦听器配置和 @KafkaListener 消费消息列表。我有一个 ConsumerInterceptor,我想为每条记录设置唯一的 id,并将其值存储在映射诊断上下文 (MDC) 中。如果我的 kafka 侦听器使用单个消息,则唯一 id 是正确的。但是我的 kafka 侦听器消耗消息列表,因此 MDC.get("id") 仅获取最后一个值。怎么处理? 我的拦截器;
public class KafkaConsumerInterceptor implements ConsumerInterceptor<String,String>{
@Override
public ConsumerRecords<String,String> onConsume(ConsumerRecords<String,String> consumerRecords) {
ConsumerRecord<String,String> record = consumerRecords.iterator().next();
setId(record.headers().headers("id"));
return consumerRecords;
}
private void setId(Iterable<Header> idHeader) {
String id = UUID.randomUUID().toString();
if (idHeader.iterator().hasNext()) {
Header header = idHeader.iterator().next();
id = new String(header.value(),StandardCharsets.UTF_8);
}
MDC.put("id",id);
}
}
解决方法
您需要将 MDC.put()
移动到处理每条记录的任何位置,您不能为整个批次设置它。
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。