如何解决测试@KafkaListener的Spring Kafka错误处理程序时发生异常
我正在尝试测试@KafkaListener正在使用的containerlistenerfactory中定义的错误处理程序。
基于要测试的侦听器抛出的异常异常,我有不同的重试计数。但是在第一次引发异常并进入错误处理程序后,我收到了一个非法状态异常,因此它没有尝试在我正在编写的测试中重试。 相同的代码在实际设置中有效。
这是我得到的例外。
寻求例外后的最新进展;嵌套的异常是org.springframework.kafka.listener.ListenerExecutionFailedException
我希望测试重试10次,然后在恢复中打印消息。但它不会重试,因为错误处理程序会抛出非法状态异常。
有人可以建议吗?
@Configuration
@EnableKafka
public class Config {
public static boolean seekPerformed;
public static int retries;
private Integer retryCount=10;
private Integer RetryCount2=5;
@Autowired
private KafkaTemplate<String,Object> kafkaTemplate;
@Spy
private errorCodes errorCodes;
@Bean
public ConcurrentKafkaListenerContainerFactory<String,Anky>kafkaListenerContainerFactory(EmbeddedKafkabroker embeddedKafka) {
ConcurrentKafkaListenerContainerFactory<String,Anky>factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory(embeddedKafka));
factory.getContainerProperties().setDeliveryAttemptHeader(true);
factory.getContainerProperties().setAckMode(AckMode.MANUAL_IMMEDIATE);
SeekToCurrentErrorHandler errorHandler = new SeekToCurrentErrorHandler((record,exception) -> {
System.out.println(
"RetryPolicy** limit has been exceeded! You should really handle this better." + record.key());
});
errorHandler.setBackOffFunction((record,exception) -> {
retries++;
seekPerformed = true;
int maxRetryCount = retryCount+ retryCount2;
Anky msg = (Anky) record.value();
if (msg.getErrorCode.equals(getExceptionA())) {
return new FixedBackOff(0L,Long.valueOf(retryCount));
}
else {
return new FixedBackOff(0L,Long.valueOf(retryCount2));
}
});
errorHandler.setCommitRecovered(true);
factory.setErrorHandler(errorHandler);
factory.setConcurrency(2);
//errorHandler.setLogLevel(Level.INFO);
factory.setStatefulRetry(true);
return factory;
}
@Bean
public DefaultKafkaConsumerFactory<String,Anky> consumerFactory(EmbeddedKafkabroker embeddedKafka) {
//return new DefaultKafkaConsumerFactory<>(consumerConfigs(embeddedKafka));
return new DefaultKafkaConsumerFactory<>(consumerConfigs(embeddedKafka),new StringDeserializer(),new JsonDeserializer<>(EdealsMessage.class,false));
}
@Bean
public Map<String,Object> consumerConfigs(EmbeddedKafkabroker embeddedKafka) {
Map<String,Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONfig,embeddedKafka.getbrokerAddress(0).toString());
props.put(ConsumerConfig.GROUP_ID_CONfig,"retry-grp");
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONfig,false);
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONfig,"earliest");
//props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONfig,new StringDeserializer());
//props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONfig,false));
return props;
}
@Bean
public ProducerFactory<String,Object> testProducerFactory(EmbeddedKafkabroker embeddedKafka) {
Map<String,Object> configs = new HashMap<>();
configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONfig,embeddedKafka.getbrokerAddress(0).toString());
configs.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONfig,StringSerializer.class);
configs.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONfig,JsonSerializer.class);
return new DefaultKafkaProducerFactory<>(configs);
}
@Bean
public KafkaTemplate<String,Object> testKafkaTemplate(EmbeddedKafkabroker embeddedKafka) {
KafkaTemplate<String,Object> kafkaTemplate = new KafkaTemplate<>(testProducerFactory(embeddedKafka));
kafkaTemplate.setDefaultTopic("sr1");
return kafkaTemplate;
}
@KafkaListener(topics = "sr1",groupId = "retry-grp",containerFactory = "kafkaListenerContainerFactory")
public void listen1(ConsumerRecord<String,Anky> record,@Header(KafkaHeaders.DELIVERY_ATTEMPT) int delivery) throws AppException{
try {
throw new AppException(//this is our custom exception in the application);
}
catch(AppException se) {
if(record.value().getNewErrorCode().equals(se.getErrorCode())) {
System.out.println("are you here?");
throw se;
}
}
}
@EnableKafka
@SpringBoottest
@ExtendWith(SpringExtension.class)
@ContextConfiguration(classes = Config.class)
@EmbeddedKafka(
partitions = 1,controlledShutdown = true,topics = {"sr1"},brokerProperties = {
"listeners=PLAINTEXT://localhost:3333","port=3333"
})
public class KafkaRetryTest {
@Autowired
private Config config;
@Autowired
private KafkaTemplate<String,Object> template;
@Autowired
KafkaListenerEndpointRegistry kafkaListenerEndpointRegistry;
@Autowired
EmbeddedKafkabroker kafkaEmbedded;
@Spy
private ErrorCodes errorCodes;
@BeforeEach
public void setUp() throws Exception,SystemException {
for (MessageListenerContainer messageListenerContainer : kafkaListenerEndpointRegistry.getListenerContainers()) {
ContainerTestUtils.waitForAssignment(messageListenerContainer,kafkaEmbedded.getPartitionsPerTopic()
);
}
}
@Test
public void testStatefulRetry() throws Exception {
Anky msg = new Anky();
msg.setNewErrorCode(errorCodes.getExceptionA());
this.template.send("sr1","3323800",msg);
assertthat(this.config.seekPerformed).isTrue();
System.out.println("******"+this.config.retries);
}
}
*******更新了代码,我无法在EmbeddedKafka上使用bootstrapServersProperty =“ spring.kafka.bootstrap-servers,因为它一直试图转到localhost:9092并超时
这是我用于测试文件的配置
@EnableKafka
@SpringBoottest(classes=MyConsumer.class)
@ExtendWith(SpringExtension.class)
@DirtiesContext
@ContextConfiguration(classes = AppConfig.class)
@EmbeddedKafka(
partitions = 1,topics = {"test_topic"},bootstrapServersProperty ="spring.kafka.bootstrap-servers"
)
public class KafkaConsumerTest {
}
@Value(value = "${kafka.bootstrapAddress}")
private String bootstrapAddress;
@Bean
public ConsumerFactory<String,Anky> consumerFactory() {
Map<String,Object> props = kafkaProperties.buildConsumerProperties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONfig,bootstrapAddress);
props.put(ConsumerConfig.GROUP_ID_CONfig,groupId);
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONfig,autoCommit);
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONfig,autoOffsetReset);
return new DefaultKafkaConsumerFactory<>(props,new JsonDeserializer<>(Anky.class,false));
}
解决方法
显示IllegalStateException
的完整堆栈跟踪。
寻求例外后的最新信息
那很正常;您可以通过设置日志级别来抑制这种情况。
这对我来说是预期的...
@SpringBootApplication
public class So64780994Application {
public static void main(String[] args) {
SpringApplication.run(So64780994Application.class,args);
}
@KafkaListener(id = "so64780994",topics = "so64780994")
public void listen(String in) {
System.out.println(in);
throw new RuntimeException("test");
}
@Bean
NewTopic topic() {
return TopicBuilder.name("so64780994").partitions(1).replicas(1).build();
}
@Bean
ErrorHandler handler() {
SeekToCurrentErrorHandler eh = new SeekToCurrentErrorHandler((rec,ex) -> {
System.out.println("Retries exhausted:" + rec);
});
eh.setBackOffFunction((rec,ex) -> {
return new FixedBackOff(0L,8);
});
eh.setLogLevel(Level.DEBUG);
return eh;
}
@Bean
ApplicationRunner runner(KafkaTemplate<String,String> template) {
return args -> template.send("so64780994","foo");
}
}
foo
2020-11-11 09:20:04.836 INFO 65759 --- [o64780994-0-C-1] o.a.k.clients.consumer.KafkaConsumer : [Consumer clientId=consumer-so64780994-1,groupId=so64780994] Seeking to offset 2 for partition so64780994-0
foo
2020-11-11 09:20:05.334 INFO 65759 --- [o64780994-0-C-1] o.a.k.clients.consumer.KafkaConsumer : [Consumer clientId=consumer-so64780994-1,groupId=so64780994] Seeking to offset 2 for partition so64780994-0
foo
2020-11-11 09:20:05.836 INFO 65759 --- [o64780994-0-C-1] o.a.k.clients.consumer.KafkaConsumer : [Consumer clientId=consumer-so64780994-1,groupId=so64780994] Seeking to offset 2 for partition so64780994-0
foo
2020-11-11 09:20:06.338 INFO 65759 --- [o64780994-0-C-1] o.a.k.clients.consumer.KafkaConsumer : [Consumer clientId=consumer-so64780994-1,groupId=so64780994] Seeking to offset 2 for partition so64780994-0
foo
2020-11-11 09:20:06.843 INFO 65759 --- [o64780994-0-C-1] o.a.k.clients.consumer.KafkaConsumer : [Consumer clientId=consumer-so64780994-1,groupId=so64780994] Seeking to offset 2 for partition so64780994-0
foo
2020-11-11 09:20:07.347 INFO 65759 --- [o64780994-0-C-1] o.a.k.clients.consumer.KafkaConsumer : [Consumer clientId=consumer-so64780994-1,groupId=so64780994] Seeking to offset 2 for partition so64780994-0
foo
2020-11-11 09:20:07.856 INFO 65759 --- [o64780994-0-C-1] o.a.k.clients.consumer.KafkaConsumer : [Consumer clientId=consumer-so64780994-1,groupId=so64780994] Seeking to offset 2 for partition so64780994-0
foo
2020-11-11 09:20:08.361 INFO 65759 --- [o64780994-0-C-1] o.a.k.clients.consumer.KafkaConsumer : [Consumer clientId=consumer-so64780994-1,groupId=so64780994] Seeking to offset 2 for partition so64780994-0
foo
Retries exhausted:ConsumerRecord(topic = so64780994,partition = 0,leaderEpoch = 0,offset = 2,CreateTime = 1605104404708,serialized key size = -1,serialized value size = 3,headers = RecordHeaders(headers = [],isReadOnly = false),key = null,value = foo)
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。