如何解决KafkaListener 监听多个topic时,可以优先考虑单个topic吗?
当我的@KafkaListener 侦听多个主题时,有谁知道如何优先考虑单个 kafka 主题?
下面是我的代码示例:
@KafkaListener(id = "priority",topics = { "${prio-topic}" },concurrency = "1",autoStartup = "true")
@KafkaListener(id = "nonPriority",topics = { "${not-prio-topic-1}","${not-prio-topic-2}","${not-prio-topic-3}","${not-prio-topic-4}",autoStartup = "true")
public synchronized void listenManyEntryTopic(String message) {}
我的问题是我想在其他 nonPrio 主题之前阅读主题 prio-topic
。只有当我的 prio-topic
为空时,我才应该开始使用其他主题,而没有任何特定顺序。
任何提示/建议表示赞赏。
感谢您的帮助!
解决方法
kafka 中没有区分优先级和非优先级主题消息的功能。为了解决您的问题,解决方案之一是将优先处理主题与非优先主题分开,即专用 app1 仅消耗优先主题消息并处理它们,同时 app2 消耗非优先消息并处理它们。
,由于您有两个侦听器容器,您可以将 @Header(KafkaHeaders.RECEIVED_TOPIC) String topic
作为参数添加到您的方法中。
然后,当您收到来自优先主题的消息时,您可以stop()
其他侦听器容器(使用 KafkaListenerEndpointRegistry
bean),如果它正在运行。
在主主题容器中配置 idleEventInterval
并为 @EventListner
(或 ListenerContainerIdleEvent
bean)添加 ApplicationListener
方法。
然后,当您检测到空闲的主容器时,您可以重新启动非主容器。
编辑
@SpringBootApplication
public class So66366140Application {
private static final Logger LOG = LoggerFactory.getLogger(So66366140Application.class);
public static void main(String[] args) {
SpringApplication.run(So66366140Application.class,args);
}
@Bean
public NewTopic topic1() {
return TopicBuilder.name("so66366140-1").partitions(1).replicas(1).build();
}
@Bean
public NewTopic topic2() {
return TopicBuilder.name("so66366140-2").partitions(1).replicas(1).build();
}
@Autowired
KafkaListenerEndpointRegistry registry;
@KafkaListener(id = "so66366140-1",topics = "so66366140-1")
@KafkaListener(id = "so66366140-2",topics = "so66366140-2",autoStartup = "false")
public void listen(String in,@Header(KafkaHeaders.RECEIVED_TOPIC) String topic) {
LOG.info(in);
if (topic.equals("so66366140-1")
&& this.registry.getListenerContainer("so66366140-2").isRunning()) {
LOG.info("Stopping non-pri container");
this.registry.getListenerContainer("so66366140-2").stop();
}
}
@EventListener
void events(ListenerContainerIdleEvent event) {
LOG.info(event.toString());
if (event.getListenerId().startsWith("so66366140-1")
&& !this.registry.getListenerContainer("so66366140-2").isRunning()) {
LOG.info("Starting non-pri container");
this.registry.getListenerContainer("so66366140-2").start();
}
}
@Bean
public ApplicationRunner runner(KafkaTemplate<String,String> template) {
return args -> {
IntStream.range(0,10).forEach(i -> {
template.send("so66366140-1","foo");
template.send("so66366140-2","bar");
try {
Thread.sleep(6_000);
}
catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
});
};
}
}
spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.listener.idle-event-interval=5s
您也可以使用暂停/恢复而不是停止/开始。
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。