如何解决使用 EmbeddedKafka Spring 进行测试时侦听器不使用消息
我试图查看当消费者收到来自 kafka 主题的消息但测试未通过并且消费者甚至没有收到消息时是否调用了服务。
我的测试:
@SpringBoottest
@DirtiesContext
@EmbeddedKafka(partitions = 1,brokerProperties = { "listeners=PLAINTEXT://localhost:9092","port=9092" })
class ConsumerTest {
@Autowired
Consumer consumer;
@Mock
private Service service;
@Autowired
private KafkaTemplate<String,String> kafkaTemplate;
@Test
public void givenEmbeddedKafkabroker_whenSendingtoSimpleProducer_thenMessageReceived()
throws Exception {
String message = "hi";
kafkaTemplate.send("topic",message);
consumer.getLatch().await(10000,TimeUnit.MILLISECONDS);
verify(service,times(1)).addMessage();
}
}
主包中的消费者是一个普通消费者,带有@KafkaListener(topics = "topic")。 然后我有一个配置文件:
@EnableKafka
@Configuration
public class KafkaConfig {
@Bean
public Map<String,Object> consumerConfigs() {
Map<String,Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONfig,"localhost:9092");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONfig,StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONfig,StringDeserializer.class);
props.put(ConsumerConfig.GROUP_ID_CONfig,"group");
return props;
}
@Bean
public ConsumerFactory<String,String> consumerFactory() {
return new DefaultKafkaConsumerFactory<>(consumerConfigs(),new StringDeserializer(),new StringDeserializer());
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String,String> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String,String> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
return factory;
}
}
也在 application.properties(在测试包内)我把这个:
春天: 卡夫卡: 消费者: 自动偏移重置:最早 组 ID:组
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。