微信公众号搜"智元新知"关注
微信扫一扫可直接关注哦!

Kafka - 如何将过期的消息移动到不同的主题?

如何解决Kafka - 如何将过期的消息移动到不同的主题?

要求是有一个 Kafka 主题接受消息。但是消息必须在一定时间内被消费和确认。如果不是,另一个进程应该接收消息并启动一个不同的进程(比如中止或回滚进程)。

一种方法是在时间过去后将消息移动到不同的主题。另一个消费者可以收听它并启动中止过程。这在卡夫卡怎么可能?或者是否有其他方法可用于此?

解决方法

您可以在主题中获取消息的时间戳并根据它决定做什么:

@KafkaListener(id = "groupId",topics = "topic1")
public void listen(String msg,@Header(KafkaHeaders.RECEIVED_TIMESTAMP) long ts) {
    if(ts > currentTs - validGapTs){
        //time stamp is valid. So,do your stuff
   }
   else {
        // send message to another topic to rollback
   }
}
,

经纪人没有这方面的机制

您需要消费消息才能知道已经过去了多长时间。

假设,cleanup.policy=delete,如果您不使用保留窗口内的数据,该事件将不可用

另一种方法是让一个压缩的主题创建一个事件表,作为您的待处理/处理/处理/中止处理状态。定期获取所有挂起的任务并检查它们的时间戳是否超出您预期的窗口

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。