微信公众号搜"智元新知"关注
微信扫一扫可直接关注哦!

为什么有时Kafka消息队列在编写集成测试时不返回消息?

如何解决为什么有时Kafka消息队列在编写集成测试时不返回消息?

我正在尝试在我的应用程序中为 Kafka 消息传递编写集成测试。当我运行我的测试时,它们有时会通过,有时会失败。当它们失败时,测试类中的以下行无法返回任何消息。我不确定为什么它有时会返回消息,有时则不会。非常感谢任何帮助解决。所有的配置都在 yml 文件中。

Map<String,MessageResponse> message = messageResponseTestListener.getMessages(); 

制作人

@KafkaClient
public interface MessageRequestTestProducer {

     @Topic("${kafka.messageRequestTopic}")
     Single<RecordMetadata> sendMessageRequest(@KafkaKey String id,MessageRequest messageRequest);
} 

消费者

@Singleton
@KafkaListener(groupId = "test",offsetReset = OffsetReset.EARLIEST)
public class MessageResponseTestListener {

    public CountDownLatch getLatch() {
        return latch;
    }

    private CountDownLatch latch = new CountDownLatch(1);

    private Map<String,MessageResponse> messages = new HashMap<>();

    @Topic("${kafka.messageResponsetopic}")
    public Single<MessageResponse> eventOccurred(@KafkaKey String key,Single<MessageResponse> recordSingle) {
        return recordSingle.doOnSuccess(record -> {
                    messages.put(key,record);
                    latch.countDown();
                }
        );

    }

    public Map<String,MessageResponse> getMessages() {
        return messages;
    }
}

KafkaBaseTest

public class KafkaTestBase implements TestPropertyProvider{

protected static KafkaContainer kafka = new KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:5.4.3"));

@Override
public Map<String,String> get()
{
    return null;
}

@NonNull
@Override
public Map<String,String> getProperties()
{
    Map<String,String> propertyOverrideMap = new HashMap();
    propertyOverrideMap.put("kafka.bootstrap.servers",kafka.getBootstrapServers());

    return propertyOverrideMap;
}

static {
    kafka.start();
}
}

测试类

@Inject
MessageRequestTestProducer messageRequestTestProducer;

@Inject
MessageResponseTestListener messageResponseTestListener;

@Test
void test() throws InterruptedException {

    TEST("Message Testing Kafka version");

    String id = "123";
    MessageRequest messageRequest = buildCorrectRequest();
    messageRequestTestProducer.sendMessageRequest(id,messageRequest).blockingGet();

    messageResponseTestListener.getLatch().await(2,TimeUnit.SECONDS);
    WHEN("Message event received");

    Map<String,MessageResponse> message = messageResponseTestListener.getMessages();

    THEN("Message is produced");

    assertTrue(message.containsKey(id));
    assertEquals(buildExpectedMessageResponse(),message.get(cdrId));

}

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。