微信公众号搜"智元新知"关注
微信扫一扫可直接关注哦!

使用嵌入式 kafka + 自定义序列化测试 Reactive-Kafka 消费者和生产者模板

如何解决使用嵌入式 kafka + 自定义序列化测试 Reactive-Kafka 消费者和生产者模板

我们需要一个关于如何使用 ReactiveKafkaConsumerTemplate 测试 ReactiveKafkaProducerTemplateembedded-kafka-broker 的示例。谢谢。

正确的代码在讨论之后

您可以相应地使用自定义 de-serializer 以使用自定义 ReactiveKafkaConsumerTemplate

自定义序列化器:


import org.apache.kafka.common.serialization.Serializer;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;

public class EmployeeSerializer implements Serializer<Employee> {

    @Override
    public byte[] serialize(String topic,Employee data) {
        
        byte[] rb = null;
        ObjectMapper mapper = new ObjectMapper();
        try {
            rb = mapper.writeValueAsstring(data).getBytes();
        } catch (JsonProcessingException e) {
            e.printstacktrace();
        }
        return rb;
    }

}

将其用作嵌入式 kfka-reactive 测试的一部分:


import java.util.Map;

import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.connect.json.JsonSerializer;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.springframework.kafka.core.reactive.ReactiveKafkaProducerTemplate;
import org.springframework.kafka.support.converter.MessagingMessageConverter;
import org.springframework.kafka.test.condition.EmbeddedKafkaCondition;
import org.springframework.kafka.test.context.EmbeddedKafka;
import org.springframework.kafka.test.utils.KafkaTestUtils;

import reactor.kafka.sender.SenderOptions;
import reactor.kafka.sender.SenderRecord;
import reactor.test.StepVerifier;

@EmbeddedKafka(topics = EmbeddedKafkareactiveTest.REACTIVE_INT_KEY_TOPIC,brokerProperties = { "transaction.state.log.replication.factor=1","transaction.state.log.min.isr=1" })
public class EmbeddedKafkareactiveTest {

    public static final String REACTIVE_INT_KEY_TOPIC = "reactive_int_key_topic";

    private static final Integer DEFAULT_KEY = 1;

    private static final String DEFAULT_VERIFY_TIMEOUT = null;

    private ReactiveKafkaProducerTemplate<Integer,Employee> reactiveKafkaProducerTemplate;

    @BeforeEach
    public void setUp() {
        reactiveKafkaProducerTemplate = new ReactiveKafkaProducerTemplate<>(setupSenderOptionsWithDefaultTopic(),new MessagingMessageConverter());
    }

    private SenderOptions<Integer,Employee> setupSenderOptionsWithDefaultTopic() {
        Map<String,Object> senderProps = KafkaTestUtils
                .producerProps(EmbeddedKafkaCondition.getbroker().getbrokersAsstring());
        SenderOptions<Integer,Employee> senderOptions = SenderOptions.create(senderProps);
        senderOptions = senderOptions.producerProperty(ProducerConfig.TRANSACTIONAL_ID_CONfig,"reactive.transaction")
                .producerProperty(ProducerConfig.ENABLE_IDEMPOTENCE_CONfig,true)
                .producerProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONfig,JsonSerializer.class.getName())
                ;
        return senderOptions;
    }

    @Test
    public void test_When_Publish() {
        
        
        Employee employee = new Employee();
        
        ProducerRecord<Integer,Employee> producerRecord = new ProducerRecord<Integer,Employee>(REACTIVE_INT_KEY_TOPIC,DEFAULT_KEY,employee);
                
        StepVerifier.create(reactiveKafkaProducerTemplate.send(producerRecord)
                .then())
                .expectComplete()
                .verify();
    }   

    @AfterEach
    public void tearDown() {
        reactiveKafkaProducerTemplate.close();
    }
}

解决方法

框架中的测试使用嵌入式 kafka 代理。

https://github.com/spring-projects/spring-kafka/tree/main/spring-kafka/src/test/java/org/springframework/kafka/core/reactive

@EmbeddedKafka(topics = ReactiveKafkaProducerTemplateIntegrationTests.REACTIVE_INT_KEY_TOPIC,partitions = 2)
public class ReactiveKafkaProducerTemplateIntegrationTests {
...
,

使用非事务性生产者添加了正确的序列化。请参阅本页顶部的代码以获取答案。

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。

相关推荐


Selenium Web驱动程序和Java。元素在(x,y)点处不可单击。其他元素将获得点击?
Python-如何使用点“。” 访问字典成员?
Java 字符串是不可变的。到底是什么意思?
Java中的“ final”关键字如何工作?(我仍然可以修改对象。)
“loop:”在Java代码中。这是什么,为什么要编译?
java.lang.ClassNotFoundException:sun.jdbc.odbc.JdbcOdbcDriver发生异常。为什么?
这是用Java进行XML解析的最佳库。
Java的PriorityQueue的内置迭代器不会以任何特定顺序遍历数据结构。为什么?
如何在Java中聆听按键时移动图像。
Java“Program to an interface”。这是什么意思?