如何解决如何测试 Spring Kafka Streams
我使用这种方法编写了这个流应用程序:
StreamConfigs
类:
@Configuration
@EnableKafka
public class StreamConfigs {
@Value(...)
private String applicationId;
@Value(...)
private String bootstrapServer;
@Bean
public KafkaStreamsConfiguration streamsConfig() {
Map<String,Object> props = new HashMap<>();
props.put(StreamsConfig.APPLICATION_ID_CONfig,applicationId);
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONfig,bootstrapServer);
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONfig,Serdes.String().getClass().getName());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONfig,Serdes.String().getClass().getName());
props.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONfig,0);
return new KafkaStreamsConfiguration(props);
}
@Bean
public StreamsBuilderfactorybean streamBuilder() {
return new StreamsBuilderfactorybean(streamsConfig());
}
@Bean
public StreamsBuilderfactorybean streamBuilder(KafkaStreamsConfiguration streamsConfig) {
return new StreamsBuilderfactorybean(streamsConfig);
}
}
还有这个班
@Component
public class MyStream {
@Value(value = ...)
private String inputTopicA;
@Value(value = ...)
private String inputTopicB;
@Value(value = ...)
private String outputTopic;
public MyStream() {}
public MyStream(String inputTopicA,String inputTopicB,String outputTopic) {
this.inputTopicA = inputTopicA;
this.inputTopicB = inputTopicB;
this.outputTopic = outputTopic;
}
@Bean
public KStream<String,String> kStream(StreamsBuilder streamBuilder) {
KTable<String,String> aKTable = streamBuilder.table(inputTopicA);
KTable<String,String> bKTable = streamBuilder.table(inputTopicB);
KTable<String,String> outputKTable = aKTable
.join(bKTable,(a,b) -> {...})
.toStream()
.groupByKey()
.reduce((aggregate,current) -> {...});
KStream<String,String> stream = outputKTable.toStream();
stream.to(outputTopic);
return stream;
}
}
在 kStream()
类的 MyStream
方法中,有我想要测试的应用程序的流逻辑。
所以我写了这个类用于测试,使用嵌入式 kafka
@EmbeddedKafka(partitions = 1)
@SpringBoottest
@TestInstance(TestInstance.Lifecycle.PER_CLASS)
class MyStreamApplicationTests {
@Value(...) private String topicA;
@Value(...) private String topicB;
@Value(...) private String outputTopic;
@Autowired
private EmbeddedKafkabroker embeddedKafkabroker;
private BlockingQueue<ConsumerRecord<String,String>> aTopicQueue;
private BlockingQueue<ConsumerRecord<String,String>> bTopicQueue;
private BlockingQueue<ConsumerRecord<String,String>> outputTopicQueue;
KafkaMessageListenerContainer<String,String> container;
@BeforeAll
void setupKafka() {
Map<String,Object> consumerConfigs = new HashMap<>(
KafkaTestUtils.consumerProps("consumer","true",embeddedKafkabroker)
);
consumerConfigs.put(ConsumerConfig.AUTO_OFFSET_RESET_CONfig,"earliest");
DefaultKafkaConsumerFactory<String,String> consumerFactory =
new DefaultKafkaConsumerFactory<>(consumerConfigs,new StringDeserializer(),new StringDeserializer());
ContainerProperties containerProperties = new ContainerProperties(aTopic,bTopic,outputTopic);
container = new KafkaMessageListenerContainer<>(consumerFactory,containerProperties);
//init record queues
aTopiccQueue = new LinkedBlockingQueue<>();
bTopicQueue = new LinkedBlockingQueue<>();
outputTopicQueue = new LinkedBlockingQueue<>();
container.setupMessageListener((MessageListener<String,String>) this::pushRecordIntoQueue);
container.start();
ContainerTestUtils.waitForAssignment(container,embeddedKafkabroker.getTopics().size() * embeddedKafkabroker.getPartitionsPerTopic());
}
@Test
void testStream() throws Exception {
//test logic of the stream
}
}
这是testStream()
方法
@Test
void testStream() throws Exception {
Map<String,Object> props = new HashMap<>();
props.put(StreamsConfig.APPLICATION_ID_CONfig,"test");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONfig,embeddedKafkabroker.getbrokersAsstring());
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONfig,Serdes.String().getClass().getName());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONfig,Serdes.String().getClass().getName());
props.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONfig,0);
KafkaStreamsConfiguration configuration = new KafkaStreamsConfiguration(props);
//ERROR HERE!!!
StreamsBuilder builder = new StreamConfigs().streamBuilder(configuration).getobject();
MyStream myStream = new MyStream(inputTopicA,inputTopicB,outputTopic);
myStream.kStream(builder);
//produce a record into inputTopicA
//produce a record into inputTopicB
ConsumerRecord<String,String> outputRecord = outputTopicQueue.take();
assertthat(outputRecord).isNotNull();
}
StreamsBuilder builder = new StreamConfigs().streamBuilder(configuration).getobject();
这是堆栈跟踪
org.springframework.beans.factory.factorybeannotinitializedException: org.springframework.kafka.config.StreamsBuilderfactorybean does not support circular references
at org.springframework.beans.factory.config.Abstractfactorybean.getEarlySingletonInstance(Abstractfactorybean.java:172)
at org.springframework.beans.factory.config.Abstractfactorybean.getobject(Abstractfactorybean.java:156)
at test.MyStreamApplicationTests.testStream(MyStreamApplicationTests.java:226)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.junit.platform.commons.util.ReflectionUtils.invokeMethod(ReflectionUtils.java:688)
at org.junit.jupiter.engine.execution.MethodInvocation.proceed(MethodInvocation.java:60)
at org.junit.jupiter.engine.execution.InvocationInterceptorChain$ValidatingInvocation.proceed(InvocationInterceptorChain.java:131)
at org.junit.jupiter.engine.extension.TimeoutExtension.intercept(TimeoutExtension.java:149)
at org.junit.jupiter.engine.extension.TimeoutExtension.interceptTestableMethod(TimeoutExtension.java:140)
at org.junit.jupiter.engine.extension.TimeoutExtension.interceptTestMethod(TimeoutExtension.java:84)
at org.junit.jupiter.engine.execution.ExecutableInvoker$ReflectiveInterceptorCall.lambda$ofVoidMethod$0(ExecutableInvoker.java:115)
at org.junit.jupiter.engine.execution.ExecutableInvoker.lambda$invoke$0(ExecutableInvoker.java:105)
at org.junit.jupiter.engine.execution.InvocationInterceptorChain$InterceptedInvocation.proceed(InvocationInterceptorChain.java:106)
at org.junit.jupiter.engine.execution.InvocationInterceptorChain.proceed(InvocationInterceptorChain.java:64)
at org.junit.jupiter.engine.execution.InvocationInterceptorChain.chainAndInvoke(InvocationInterceptorChain.java:45)
at org.junit.jupiter.engine.execution.InvocationInterceptorChain.invoke(InvocationInterceptorChain.java:37)
at org.junit.jupiter.engine.execution.ExecutableInvoker.invoke(ExecutableInvoker.java:104)
at org.junit.jupiter.engine.execution.ExecutableInvoker.invoke(ExecutableInvoker.java:98)
at org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.lambda$invokeTestMethod$6(TestMethodTestDescriptor.java:210)
at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
at org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.invokeTestMethod(TestMethodTestDescriptor.java:206)
at org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.execute(TestMethodTestDescriptor.java:131)
at org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.execute(TestMethodTestDescriptor.java:65)
at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$5(NodeTestTask.java:139)
at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$7(NodeTestTask.java:129)
at org.junit.platform.engine.support.hierarchical.Node.around(Node.java:137)
at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$8(NodeTestTask.java:127)
at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
at org.junit.platform.engine.support.hierarchical.NodeTestTask.executeRecursively(NodeTestTask.java:126)
at org.junit.platform.engine.support.hierarchical.NodeTestTask.execute(NodeTestTask.java:84)
at java.util.ArrayList.forEach(ArrayList.java:1257)
at org.junit.platform.engine.support.hierarchical.SameThreadHierarchicalTestExecutorService.invokeAll(SameThreadHierarchicalTestExecutorService.java:38)
at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$5(NodeTestTask.java:143)
at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$7(NodeTestTask.java:129)
at org.junit.platform.engine.support.hierarchical.Node.around(Node.java:137)
at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$8(NodeTestTask.java:127)
at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
at org.junit.platform.engine.support.hierarchical.NodeTestTask.executeRecursively(NodeTestTask.java:126)
at org.junit.platform.engine.support.hierarchical.NodeTestTask.execute(NodeTestTask.java:84)
at java.util.ArrayList.forEach(ArrayList.java:1257)
at org.junit.platform.engine.support.hierarchical.SameThreadHierarchicalTestExecutorService.invokeAll(SameThreadHierarchicalTestExecutorService.java:38)
at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$5(NodeTestTask.java:143)
at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$7(NodeTestTask.java:129)
at org.junit.platform.engine.support.hierarchical.Node.around(Node.java:137)
at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$8(NodeTestTask.java:127)
at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
at org.junit.platform.engine.support.hierarchical.NodeTestTask.executeRecursively(NodeTestTask.java:126)
at org.junit.platform.engine.support.hierarchical.NodeTestTask.execute(NodeTestTask.java:84)
at org.junit.platform.engine.support.hierarchical.SameThreadHierarchicalTestExecutorService.submit(SameThreadHierarchicalTestExecutorService.java:32)
at org.junit.platform.engine.support.hierarchical.HierarchicalTestExecutor.execute(HierarchicalTestExecutor.java:57)
at org.junit.platform.engine.support.hierarchical.HierarchicalTestEngine.execute(HierarchicalTestEngine.java:51)
at org.junit.platform.launcher.core.EngineExecutionorchestrator.execute(EngineExecutionorchestrator.java:108)
at org.junit.platform.launcher.core.EngineExecutionorchestrator.execute(EngineExecutionorchestrator.java:88)
at org.junit.platform.launcher.core.EngineExecutionorchestrator.lambda$execute$0(EngineExecutionorchestrator.java:54)
at org.junit.platform.launcher.core.EngineExecutionorchestrator.withInterceptedStreams(EngineExecutionorchestrator.java:67)
at org.junit.platform.launcher.core.EngineExecutionorchestrator.execute(EngineExecutionorchestrator.java:52)
at org.junit.platform.launcher.core.DefaultLauncher.execute(DefaultLauncher.java:96)
at org.junit.platform.launcher.core.DefaultLauncher.execute(DefaultLauncher.java:75)
at com.intellij.junit5.junit5IdeaTestRunner.startRunnerWithArgs(junit5IdeaTestRunner.java:69)
at com.intellij.rt.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:33)
at com.intellij.rt.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:230)
at com.intellij.rt.junit.JUnitStarter.main(JUnitStarter.java:58)
如何获得 StreamBuilder
实例以便将我的 kStream()
方法重用于测试和嵌入式代理?
如果我尝试使用 StreamBuilder
创建 new
实例,我的测试停留在等待我的机器上的活动代理上。
你能帮我吗?
编辑:
我认为这可以解决问题:
我以这种方式编辑 setupKafka()
类的 MyStreamApplicationTest
方法:
@BeforeAll
void setupKafka() {
...
Properties streamConfig = new Properties();
streamConfig.put(StreamsConfig.APPLICATION_ID_CONfig,"test");
streamConfig.put(StreamsConfig.BOOTSTRAP_SERVERS_CONfig,embeddedKafkabroker.getbrokersAsstring());
streamConfig.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONfig,Serdes.String().getClass().getName());
streamConfig.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONfig,Serdes.String().getClass().getName());
streamConfig.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONfig,0);
StreamsBuilder streamsBuilder = new StreamsBuilder();
MyStream myStream = new MergeStream(topicA,topicB,outputTopic);
myStream.process(streamsBuilder);
Topology topology = streamsBuilder.build();
new KafkaStreams(topology,streamConfig).start();
ContainerTestUtils.waitForAssignment(container,embeddedKafkabroker.getTopics().size() * embeddedKafkabroker.getPartitionsPerTopic());
}
解决方法
你不能那样做 - Spring 需要管理它。
只需将 bootstrapServersProperty = ...
添加到 @EmbeddedKafka
注释中,然后您的 bootstrapServer
中的 StreamConfigs
字段将被设置为嵌入式代理的地址。将其设置为您的 @Value
属性。
然后您可以简单地将工厂 bean 创建的 @Autowired
StreamBuilder
放入您的测试中。
此外,您只需要其中之一
@Bean
public StreamsBuilderFactoryBean streamBuilder() {
return new StreamsBuilderFactoryBean(streamsConfig());
}
@Bean
public StreamsBuilderFactoryBean streamBuilder(KafkaStreamsConfiguration streamsConfig) {
return new StreamsBuilderFactoryBean(streamsConfig);
}
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。