如何解决NPE 使用嵌入式 Kafka 测试 Kafka Producer
我编写了一个基本的 Spring Boot 服务,它通过 rest API 消耗一些数据并将其发布到 rabbitmq 和 kafka。
为了测试处理 kafka 生成的服务类,我遵循了本指南:https://www.baeldung.com/spring-boot-kafka-testing
单独而言,测试 (KafkaMessagingServiceImpltest) 在 intellij 理念和命令行上通过 mvn 完美运行。在idea中运行所有项目测试工作正常。但是,当我在命令行上通过 Maven 运行所有项目测试时,该测试在尝试对有效负载字符串进行断言时失败并显示 NPE。
我已将根本问题的位置缩小到另一个测试类 (AppPropertiesTest),该类仅测试我的 AppProperties 组件(这是我用来以整洁的方式从 application.properties 提取配置的组件)。当且仅当该测试类中的测试与使用项目根目录中的“mvn clean install”失败的测试一起运行时,NPE 才会显示。注释掉此类中的测试或使用 @DirtiesContext 对其进行注释可解决此问题。显然,此测试类加载到 spring 上下文中的某些内容会导致其他测试中事件/倒计时的时间/顺序出现问题。当然,我不想使用@DirtiesContext,因为随着项目复杂性的增加,它会导致构建速度变慢。它也不能解释问题..我无法处理:)
AppPropertiesTest 使用构造函数注入来注入 AppProperties 组件。它还扩展了一个抽象类“GenericServiceTest”,注释为:
@SpringBoottest
@TestConstructor(autowireMode = TestConstructor.AutowireMode.ALL)
并且不包含任何其他内容。您可能知道,SpringBoottest 注释在样板中构建了一个测试 spring 上下文和连线,以允许对 spring 应用程序的依赖注入等进行有效测试,而 TestConstructor 注释允许在我的一些测试中进行构造函数注入。 FWIW,我尝试删除 TestConstructor 注释并在 AppProperties 类中使用普通的旧自动装配来查看它是否有所不同,但没有。
失败的测试类还扩展了 GenericServiceTest,因为它需要 spring 上下文来注入一些依赖项,例如消费者和被测试的消息服务以及其中的 AppProperties 实例等。
所以我知道问题出在哪里,但我不知道问题出在哪里。即使使用 NPE 测试失败,我也可以在日志中看到消费者在失败前成功消费了消息,根据 Baeldung 指南:
TestKafkaConsumer : received payload='ConsumerRecord(topic = test-kafka-topic,partition = 0,leaderEpoch = 0,offset = 0,CreateTime = 1618997289238,serialized key size = -1,serialized value size = 43,headers = RecordHeaders(headers = [],isReadOnly = false),key = null,value = This is a test message to be sent to Kafka.)'
然而,当我们回到断言时,payLoad 为空。我在失败的测试中尝试了各种各样的东西,比如 Thread.sleep() 来给它更多的时间,我增加了 await() 超时但没有任何乐趣。
我觉得奇怪的是,这些测试在 IDEA 和隔离中都很好。现在它开始让我有点抓狂,我无法调试它,因为问题没有出现在我的 IDE 中。
如果有人有任何想法,将不胜感激!
谢谢。
失败测试(在 assertTrue(payload.contains(testMessage)) 处失败,因为 payLoad 为空)。自动装配的 kafkaMessagingService 只是注入了 AppProperties 和 KakfaTemplate 的依赖项并调用了 kafkaTemplate.send():
@EmbeddedKafka(partitions = 1,brokerProperties = { "listeners=PLAINTEXT://localhost:9092","port=9092" })
class KafkaMessagingServiceImpltest extends GenericServiceTest {
@Autowired
@Qualifier("kafkaMessagingServiceImpl")
private IMessagingService messagingService;
@Autowired
private TestKafkaConsumer kafkaConsumer;
@Value("${app.topicName}")
private String testTopic;
@Test
public void testSendAndConsumeKafkaMessage() throws InterruptedException {
String testMessage = "This is a test message to be sent to Kafka.";
messagingService.sendMessage(testMessage);
kafkaConsumer.getLatch().await(2000,TimeUnit.MILLISECONDS);
String payload = kafkaConsumer.getPayload();
assertTrue(payload.contains(testMessage));
}
TestConsumer(用于上面测试中的消费)
@Component
public class TestKafkaConsumer {
private static final Logger LOGGER = LoggerFactory.getLogger(TestKafkaConsumer.class);
private CountDownLatch latch = new CountDownLatch(1);
private String payload = null;
@KafkaListener(topics = "${app.topicName}")
public void receive(ConsumerRecord<?,?> consumerRecord) {
LOGGER.info("received payload='{}'",consumerRecord.toString());
setPayload(consumerRecord.toString());
latch.countDown();
}
public CountDownLatch getLatch() {
return latch;
}
public String getPayload() {
return payload;
}
public void setPayload(String payload) {
this.payload = payload;
}
项目依赖:
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
<version>2.2.2.RELEASE</version>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<version>2.5.8.RELEASE</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-actuator</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-configuration-processor</artifactId>
<optional>true</optional>
</dependency>
<!-- https://mvnrepository.com/artifact/org.mockito/mockito-all -->
<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-all</artifactId>
<version>1.10.19</version>
<scope>test</scope>
</dependency>
<!-- https://mvnrepository.com/artifact/org.springframework.kafka/spring-kafka-test -->
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka-test</artifactId>
<version>2.5.6.RELEASE</version>
<scope>test</scope>
</dependency>
</dependencies>
<dependencyManagement>
<dependencies>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-dependencies</artifactId>
<version>${spring-cloud.version}</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
AppPropertiesTest 类(其上下文似乎导致问题)
class AppPropertiesTest extends GenericServiceTest {
private final AppProperties appProperties;
public AppPropertiesTest(AppProperties appProperties) {
this.appProperties = appProperties;
}
@Test
public void testAppPropertiesGetQueueName() {
String expected = "test-queue";
String result = appProperties.getRabbitMQQueueName();
assertEquals(expected,result);
}
@Test
public void testAppPropertiesGetDurableQueue() {
boolean isDurableQueue = appProperties.isDurableQueue();
assertTrue(isDurableQueue);
}
}
AppPropertiesTest 类正在测试的 AppProperties 类:
@Component
@ConfigurationProperties("app")
public class AppProperties {
// a whole bunch of properties by name that are prefixed by app. in the application.properties file. nothing else
}
两个测试都扩展的通用服务测试类。
@SpringBoottest
@TestConstructor(autowireMode = TestConstructor.AutowireMode.ALL)
public abstract class GenericServiceTest {
}
失败(您可以在上面的行中看到有效负载已接收并打印出来)。
2021-04-21 14:15:07.113 INFO 493384 --- [ntainer#0-0-C-1] service.TestKafkaConsumer : received payload='ConsumerRecord(topic = test-kafka-topic,CreateTime = 1619010907076,value = This is a test message to be sent to Kafka.)'
[ERROR] Tests run: 1,Failures: 0,Errors: 1,Skipped: 0,Time elapsed: 3.791 s <<< FAILURE! - in
service.KafkaMessagingServiceImpltest
[ERROR] testSendAndConsumeKafkaMessage Time elapsed: 2.044 s <<< ERROR!
java.lang.NullPointerException
at service.KafkaMessagingServiceImpltest.testSendAndConsumeKafkaMessage(KafkaMessagingServiceImpltest.java:42)
解决方法
问题在于 TestListener
是一个 @Component
,因此它被添加了两次 - 记录将转到另一个实例。
我添加了更多调试来验证在不同实例上调用 getter。
@Component
public class TestKafkaConsumer {
private static final Logger LOGGER = LoggerFactory.getLogger(TestKafkaConsumer.class);
private final CountDownLatch latch = new CountDownLatch(1);
private String payload = null;
@KafkaListener(id = "myListener",topics = "${app.kafkaTopicName}")
public void receive(ConsumerRecord<?,?> consumerRecord) {
LOGGER.info("received payload='{}'",consumerRecord.toString());
setPayload(consumerRecord.toString());
if (payload != null) {
LOGGER.info(this + ": payload is not null still");
}
latch.countDown();
if (payload != null) {
LOGGER.info(this + ": payload is not null after latch countdown");
}
}
public CountDownLatch getLatch() {
return latch;
}
public String getPayload() {
LOGGER.info(this + ": getting Payload");
return payload;
}
public void setPayload(String payload) {
this.payload = payload;
}
}
如果您不想使用 @DirtiesContext
,您至少可以在测试完成后停止侦听器容器:
@SpringBootTest
@TestConstructor(autowireMode = TestConstructor.AutowireMode.ALL)
public abstract class GenericDataServiceTest {
@AfterAll
static void stopContainers(@Autowired KafkaListenerEndpointRegistry registry) {
registry.stop();
}
}
[INFO] ------------------------------------------------------------------------
[INFO] BUILD SUCCESS
[INFO] ------------------------------------------------------------------------
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。