如何解决使用 EmbeddedKafka
我正在尝试使用 Springboot 和 EmbeddedKafka 为单元测试设置一个类。 我有两个主题,topicA 和 topicB,我将测试消息生成到 topicA 和 topicB。
这是我的课程:
@EmbeddedKafka()
@SpringBoottest
class ApplicationTests {
private String topicA = "A";
private String topicB = "B";
@Autowired
private EmbeddedKafkabroker embeddedKafkabroker;
BlockingQueue<ConsumerRecord<String,String>> topicAContent;
BlockingQueue<ConsumerRecord<String,String>> topicBContent;
KafkaMessageListenerContainer<String,String> container;
@BeforeEach
void setup() {
Map<String,Object> consumerConfigs = new HashMap<>(
KafkaTestUtils.consumerProps("consumer","true",embeddedKafkabroker)
);
DefaultKafkaConsumerFactory<String,String> consumerFactory =
new DefaultKafkaConsumerFactory<>(consumerConfigs,new StringDeserializer(),new StringDeserializer());
ContainerProperties containerProperties = new ContainerProperties(topicA,topicB);
container = new KafkaMessageListenerContainer<>(consumerFactory,containerProperties);
topicAContent = new LinkedBlockingQueue<>();
topicBContent = new LinkedBlockingQueue<>();
container.setupMessageListener((MessageListener<String,String>) this::pushRecord);
container.start();
ContainerTestUtils.waitForAssignment(container,embeddedKafkabroker.getPartitionsPerTopic());
}
private void pushRecord(ConsumerRecord<String,String> record) {
String topic = record.topic();
if(topic.equals(topicA)) {
topicAContent.add(record);
}
else if(topic.equals(topicB)) {
topicBContent.add(record);
}
}
@Test
public void produceIntoTopicA() {
Map<String,Object> configs = new HashMap<>(KafkaTestUtils.producerProps(embeddedKafkabroker));
Producer<String,String> producer = new DefaultKafkaProducerFactory<>(configs,new StringSerializer(),new StringSerializer()).createProducer();
producer.send(new ProducerRecord<>(topicA,"a","Hello A"));
producer.flush();
ConsumerRecord<String,String> singleRecord = topicAContent.poll(100,TimeUnit.MILLISECONDS);
assertthat(singleRecord).isNotNull();
assertthat(singleRecord.key()).isEqualTo("a");
assertthat(singleRecord.value()).isEqualTo("Hello A");
}
@Test
public void produceIntoTopicB() {
Map<String,new StringSerializer()).createProducer();
producer.send(new ProducerRecord<>(topicB,"b","Hello B"));
producer.flush();
ConsumerRecord<String,String> singleRecord = topicBContent.poll(100,TimeUnit.MILLISECONDS);
assertthat(singleRecord).isNotNull();
assertthat(singleRecord.key()).isEqualTo("b");
assertthat(singleRecord.value()).isEqualTo("Hello B");
}
}
现在,如果我运行测试,produceIntoTopicB 测试将失败并显示此错误:
java.lang.IllegalStateException: Expected 1 but got 2 partitions
at org.springframework.kafka.test.utils.ContainerTestUtils.waitForSingleContainerAssignment(ContainerTestUtils.java:115)
at org.springframework.kafka.test.utils.ContainerTestUtils.waitForAssignment(ContainerTestUtils.java:51)
at it.test.ApplicationTests.setup(ApplicationTests.java:92)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.junit.platform.commons.util.ReflectionUtils.invokeMethod(ReflectionUtils.java:688)
at org.junit.jupiter.engine.execution.MethodInvocation.proceed(MethodInvocation.java:60)
at org.junit.jupiter.engine.execution.InvocationInterceptorChain$ValidatingInvocation.proceed(InvocationInterceptorChain.java:131)
at org.junit.jupiter.engine.extension.TimeoutExtension.intercept(TimeoutExtension.java:149)
at org.junit.jupiter.engine.extension.TimeoutExtension.interceptLifecycleMethod(TimeoutExtension.java:126)
at org.junit.jupiter.engine.extension.TimeoutExtension.interceptBeforeAllMethod(TimeoutExtension.java:68)
at org.junit.jupiter.engine.execution.ExecutableInvoker$ReflectiveInterceptorCall.lambda$ofVoidMethod$0(ExecutableInvoker.java:115)
at org.junit.jupiter.engine.execution.ExecutableInvoker.lambda$invoke$0(ExecutableInvoker.java:105)
at org.junit.jupiter.engine.execution.InvocationInterceptorChain$InterceptedInvocation.proceed(InvocationInterceptorChain.java:106)
at org.junit.jupiter.engine.execution.InvocationInterceptorChain.proceed(InvocationInterceptorChain.java:64)
at org.junit.jupiter.engine.execution.InvocationInterceptorChain.chainAndInvoke(InvocationInterceptorChain.java:45)
at org.junit.jupiter.engine.execution.InvocationInterceptorChain.invoke(InvocationInterceptorChain.java:37)
at org.junit.jupiter.engine.execution.ExecutableInvoker.invoke(ExecutableInvoker.java:104)
at org.junit.jupiter.engine.execution.ExecutableInvoker.invoke(ExecutableInvoker.java:98)
at org.junit.jupiter.engine.descriptor.ClassBasedTestDescriptor.lambda$invokeBeforeAllMethods$9(ClassBasedTestDescriptor.java:384)
at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
at org.junit.jupiter.engine.descriptor.ClassBasedTestDescriptor.invokeBeforeAllMethods(ClassBasedTestDescriptor.java:382)
at org.junit.jupiter.engine.descriptor.ClassBasedTestDescriptor.before(ClassBasedTestDescriptor.java:196)
at org.junit.jupiter.engine.descriptor.ClassBasedTestDescriptor.before(ClassBasedTestDescriptor.java:78)
at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$5(NodeTestTask.java:136)
at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$7(NodeTestTask.java:129)
at org.junit.platform.engine.support.hierarchical.Node.around(Node.java:137)
at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$8(NodeTestTask.java:127)
at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
at org.junit.platform.engine.support.hierarchical.NodeTestTask.executeRecursively(NodeTestTask.java:126)
at org.junit.platform.engine.support.hierarchical.NodeTestTask.execute(NodeTestTask.java:84)
at java.util.ArrayList.forEach(ArrayList.java:1257)
at org.junit.platform.engine.support.hierarchical.SameThreadHierarchicalTestExecutorService.invokeAll(SameThreadHierarchicalTestExecutorService.java:38)
at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$5(NodeTestTask.java:143)
at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$7(NodeTestTask.java:129)
at org.junit.platform.engine.support.hierarchical.Node.around(Node.java:137)
at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$8(NodeTestTask.java:127)
at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
at org.junit.platform.engine.support.hierarchical.NodeTestTask.executeRecursively(NodeTestTask.java:126)
at org.junit.platform.engine.support.hierarchical.NodeTestTask.execute(NodeTestTask.java:84)
at org.junit.platform.engine.support.hierarchical.SameThreadHierarchicalTestExecutorService.submit(SameThreadHierarchicalTestExecutorService.java:32)
at org.junit.platform.engine.support.hierarchical.HierarchicalTestExecutor.execute(HierarchicalTestExecutor.java:57)
at org.junit.platform.engine.support.hierarchical.HierarchicalTestEngine.execute(HierarchicalTestEngine.java:51)
at org.junit.platform.launcher.core.EngineExecutionorchestrator.execute(EngineExecutionorchestrator.java:108)
at org.junit.platform.launcher.core.EngineExecutionorchestrator.execute(EngineExecutionorchestrator.java:88)
at org.junit.platform.launcher.core.EngineExecutionorchestrator.lambda$execute$0(EngineExecutionorchestrator.java:54)
at org.junit.platform.launcher.core.EngineExecutionorchestrator.withInterceptedStreams(EngineExecutionorchestrator.java:67)
at org.junit.platform.launcher.core.EngineExecutionorchestrator.execute(EngineExecutionorchestrator.java:52)
at org.junit.platform.launcher.core.DefaultLauncher.execute(DefaultLauncher.java:96)
at org.junit.platform.launcher.core.DefaultLauncher.execute(DefaultLauncher.java:75)
at com.intellij.junit5.junit5IdeaTestRunner.startRunnerWithArgs(junit5IdeaTestRunner.java:69)
at com.intellij.rt.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:33)
at com.intellij.rt.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:230)
at com.intellij.rt.junit.JUnitStarter.main(JUnitStarter.java:58)
java.lang.AssertionError:
Expecting actual not to be null
我错在哪里?
解决方法
对于第二个,你需要
consumerConfigs.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,"earliest");
默认情况下是 latest
,因此存在竞争(尽管 waitForAssignment()
应该可以防止这种情况发生,请尝试 DEBUG 日志记录)。
对于第一个,编辑问题以显示完整的堆栈跟踪。
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。