微信公众号搜"智元新知"关注
微信扫一扫可直接关注哦!

用于 OnFailure 回调的 Kafka Junit

如何解决用于 OnFailure 回调的 Kafka Junit

我有以下代码将数据发送到 Kafka。我能够成功发送测试 OnSucess 回调,但我无法测试 onFailure() 方法

我收到以下错误

java.lang.classCastException: org.mockito.codegen.Throwable$MockitoMock$2137573915 不能转换为 org.springframework.kafka.core.KafkaProducerException

@Test
    void test() throws InterruptedException,ExecutionException {

         Throwable ex = mock(Throwable.class);
                 Employee employee = new Employee();

        when(kafkaTemplate.send(null,employee )).thenReturn(responseFuture);
            when(sendResult.getProducerRecord()).thenReturn(producerRecord);
            when(producerRecord.value()).thenReturn(employee);

            doAnswer(invocationOnMock -> {
                ListenableFutureCallback<SendResult<String,Employee>> listenableFutureCallback = invocationOnMock.getArgument(0);
                
                listenableFutureCallback.onFailure(ex);
               
                return null;
            }).when(responseFuture).addCallback(any(ListenableFutureCallback.class));
            
            kafkaSender.sendMessage(employee);
            
         }

@Service
public class KafkaSender{
    @Autowired
    private KafkaTemplate<String,Employee> kafkaTemplate;
    public void sendMessage(Employee employee) {

        ObjectMapper objectMapper = new ObjectMapper();

        ListenableFuture<SendResult<String,Employee>> listenableFuture = kafkaTemplate.send(topic,employee);

        listenableFuture.addCallback(new ListenableFutureCallback<SendResult<String,Employee>>() {

            @Override
            public void onSuccess(SendResult<String,Employee> result) {

                
                saveInDatabaseMethod(result.getProducerRecord()); // method to save in DB

            }

            @Override
            public void onFailure(Throwable ex) {

                                 // class cast exception occur here
                ProducerRecord<String,Employee> producerRecord = ((KafkaProducerException) ex).getFailedProducerRecord();


                saveInDatabaseMethod(producerRecord);

            }

            }

解决方法

ProducerRecord<String,Employee> producerRecord = ((KafkaProducerException) ex).getFailedProducerRecord();

你的模拟不是用 KPE 调用回调,而是用这个调用它

Throwable ex = mock(Throwable.class);

您需要将其包装在 KPE 中。

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。