微信公众号搜"智元新知"关注
微信扫一扫可直接关注哦!

EmbeddedKafka w/ ContainerTestUtils.waitForAssignment 抛出:预期为 1 但得到 0 个分区

如何解决EmbeddedKafka w/ ContainerTestUtils.waitForAssignment 抛出:预期为 1 但得到 0 个分区

我们有一个集成测试,我们使用 EmbeddedKafka 并向主题生成消息,我们的应用程序处理该消息,并将结果发送到我们使用和断言输出的第二个主题。在 CI 中,这可能有 2/3 的时间有效,但我们会遇到 KafkaTestUtils.getSingleRecord 抛出 java.lang.IllegalStateException: No records found for topic 的情况(参见下面的 [1])。

为了尝试解决这个问题,我为注册表中的每个侦听器容器添加ContainerTestUtils.waitForAssignment(请参阅下面的 [2])。在 CI 中成功运行了几次之后,我看到了一个新的异常:java.lang.IllegalStateException: Expected 1 but got 0 partitions。这让我想知道这是否真的是未找到记录的原始异常的根本原因。

有什么想法可以帮助解决这里的随机故障?如果您有任何有关如何进行故障排除的建议,我将不胜感激。

spring-kafka 和 spring-kafka-test v2.6.4。

编辑:添加newConsumer 以供参考。

我们的设置示例:

@SpringBoottest
@RunWith(springrunner.class)
@DirtiesContext
@EmbeddedKafka(
    topics = { "topic1","topic2" },partitions = 1,brokerProperties = {"listeners=PLAINTEXT://localhost:9099","port=9099"})
public class IntegrationTest {

  @Autowired
  private EmbeddedKafkabroker embeddedKafkabroker;

  @Autowired
  private KafkaListenerEndpointRegistry kafkaListenerEndpointRegistry;

  @Test
  public void testExample() {
    try (Consumer<String,String> consumer = newConsumer()) {
      for (MessageListenerContainer messageListenerContainer : kafkaListenerEndpointRegistry.getListenerContainers()) {
        [2]
        ContainerTestUtils.waitForAssignment(messageListenerContainer,embeddedKafkabroker.getPartitionsPerTopic());
      }

      try (Producer<String,String> producer = newProducer()) {
        embeddedKafkabroker.consumeFromAnEmbeddedTopic(consumer,"topic2"); // [1]

        producer.send(new ProducerRecord<>(
            "topic1","test payload"));
        producer.flush();
      }

      String result = KafkaTestUtils.getSingleRecord(consumer,"topic2").value();
      assertEquals(result,"expected result");
    }
  }

  private Consumer<String,String> newConsumer() {
    Map<String,Object> consumerProps = KafkaTestUtils.consumerProps("groupId","false",embeddedKafkabroker);
    ConsumerFactory<String,AssetTransferResponse> consumerFactory = new DefaultKafkaConsumerFactory<>(
        consumerProps,new StringDeserializer(),new CustomDeserializer<>());
    return consumerFactory.createConsumer();
  }
}

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。