微信公众号搜"智元新知"关注
微信扫一扫可直接关注哦!

如何在 Spring Cloud Stream Kafka Binder 中应用死信队列的保留时间配置?

如何解决如何在 Spring Cloud Stream Kafka Binder 中应用死信队列的保留时间配置?

我有一个使用 Spring Cloud Stream Kafka 的应用程序。 对于用户定义的主题,我可以通过提供我在下面提到的配置来从指定的主题删除记录。但此配置不适用于 DLQ 主题

例如在下面的配置中,我在活页夹级别配置了保留时间。所以我在绑定级别下定义的生产者主题(学生主题)正确配置,我可以检查当主题日志超过指定的保留字节(300000000)时记录是否被删除

但是活页夹级别的保留时间不起作用 DLQ 主题(person-topic-error-dlq)。除了保留时间之外,是否有任何不同的配置来清除来自 DLQ 主题的记录。

我该怎么做?

spring:
  cloud:
    stream:
      kafka:
        bindings:
          person-topic-in:
            consumer:
              enableDlq: true
              dlqName: person-topic-error-dlq
      binders:
        defaultKafka:
          type: kafka
          environment:
            spring:
              cloud:
                stream:
                  kafka:
                    default:
                      producer:
                        topic:
                          properties:
                            retention.bytes: 300000000
                            segment.bytes: 300000000
                    binder:
                      brokers: localhost:19092
      bindings:
        person-topic-in:
          binder: defaultKafka
          destination: person-topic
          contentType: application/json
          group: person-topic-group
        student-topic-out:
          binder: defaultKafka
          destination: student-topic
          contentType: application/json

解决方法

您只是为生产者绑定设置(默认)属性。

也就是说,这仍然对我不起作用:

      binders:
        defaultKafka:
          type: kafka
          environment:
            spring:
              cloud:
                stream:
                  kafka:
                    default:
                      producer:
                        topic:
                          properties:
                            retention.bytes: 300000000
                            segment.bytes: 300000000
                      consumer:
                        topic:
                          properties:
                            retention.bytes: 300000000
                            segment.bytes: 300000000

(这些属性甚至不适用于主要主题)。

看起来默认的 kafka 消费者绑定属性有问题。

这对我有用;这些属性适用于主要和死信主题:

spring:
  cloud:
    stream:
      kafka:
        bindings:
          person-topic-in:
            consumer:
              enableDlq: true
              dlqName: person-topic-error-dlq
              topic:
                properties:
                  retention.bytes: 300000000
                  segment.bytes: 300000000

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。