@KafkaListener:通过传入参数生成clientId和groupId

如何解决@KafkaListener:通过传入参数生成clientId和groupId

我一直将@KafkaListeners 用于必须具有唯一 ID 的 Kafka 事件消费者,并且每个事件消费者必须有一个具有非常特殊名称的单独组。 例如:

@KafkaListener(id="AmazingProductEventConsumer",topics = "${kafka......topic}",clientIdPrefix = "AmazingProductEventConsumerClientId",groupId = "${kafka.group.id.prefix}-${environemnt.id}-${application.name}-${kafka.......topic}-#{T(java.util.UUID).randomUUID().toString()}")
public class AmazingProductEventConsumer {
   ... methods...
}

或使用批处理事件侦听器:

public class ProductBatchEventConsumer {

   @KafkaListener(id="ProductBatchEventConsumer ",clientIdPrefix = "ProductBatchEventConsumerClientId",groupId = "${kafka.group.id.prefix}-${environemnt.id}-${application.name}-${kafka.......topic}-#{T(java.util.UUID).randomUUID().toString()}")
   public void batchEventConsumer(List<Record> records) {
     .... 
   }
}

应用程序相当大,所以很多消费者订阅了同一个主题,但一般来说,在多个服务中大约有 500-600 个消费者订阅了大约 180-200 个主题。我想避免使用样板代码提取可用于生成这些参数的通用模式,主题和分区除外。

在 Spring Boot 2.2 中,我为此使用了一个单独的 BeanPostProcessor 并在初始化之前生成了所需的字段,并用需要的实例替换了消费者的 KafkaListener 注释,但在 Spring Boot 2.5.0 中,此功能不再可用,因为 KafkaListenerAnnotationBeanPostProcessor uses KafkaListener ann = AnnotatedElementUtils.findMergedAnnotation(clazz,KafkaListener.class); 基于首次声明的注释属性合成新的 KafkaListeners。

我很好奇,关于如何生成@KafkaListener 的字段idclientIdgroupId 和其他字段,是否有任何合法的方法可以通过通用模式从其他侦听器中提取出来。

解决方法

您可以使用更简单的 SpEL。

groupId = "#{@someBean.groupId}"

并将占位符和 UUID 生成放在该 bean 的 getGroupId() 中。

并且您可以使用带有 @KafkaListener 元注释的自定义注释和您的道具以避免样板。

或者,如果您对每个侦听器(UUID 除外)有其他一些可变性,请使用

#{@someBean.groupId('${small.property}'})

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。

相关推荐


Selenium Web驱动程序和Java。元素在(x,y)点处不可单击。其他元素将获得点击?
Python-如何使用点“。” 访问字典成员?
Java 字符串是不可变的。到底是什么意思?
Java中的“ final”关键字如何工作?(我仍然可以修改对象。)
“loop:”在Java代码中。这是什么,为什么要编译?
java.lang.ClassNotFoundException:sun.jdbc.odbc.JdbcOdbcDriver发生异常。为什么?
这是用Java进行XML解析的最佳库。
Java的PriorityQueue的内置迭代器不会以任何特定顺序遍历数据结构。为什么?
如何在Java中聆听按键时移动图像。
Java“Program to an interface”。这是什么意思?
Java在半透明框架/面板/组件上重新绘画。
Java“ Class.forName()”和“ Class.forName()。newInstance()”之间有什么区别?
在此环境中不提供编译器。也许是在JRE而不是JDK上运行?
Java用相同的方法在一个类中实现两个接口。哪种接口方法被覆盖?
Java 什么是Runtime.getRuntime()。totalMemory()和freeMemory()?
java.library.path中的java.lang.UnsatisfiedLinkError否*****。dll
JavaFX“位置是必需的。” 即使在同一包装中
Java 导入两个具有相同名称的类。怎么处理?
Java 是否应该在HttpServletResponse.getOutputStream()/。getWriter()上调用.close()?
Java RegEx元字符(。)和普通点?