如何解决将 Java 对象返回给适当的 Kafka 生产者
我想实现基于此 example 发送和接收序列化 Java 对象的 Kafka 主题。
我试过了:
生产者配置:
@Configuration
public class KafkaProducerConfig {
@Value(value = "${kafka.bootstrapAddress}")
private String bootstrapAddress;
@Bean
public ProducerFactory<String,Object> requestFactoryProducerFactory() {
Map<String,Object> configProps = new HashMap<>();
configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONfig,bootstrapAddress);
configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONfig,StringSerializer.class);
configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONfig,ObjectFactorySerializer.class);
return new DefaultKafkaProducerFactory<>(configProps);
}
@Bean
public KafkaTemplate<String,Object> requestFactoryKafkaTemplate() {
return new KafkaTemplate<>(requestFactoryProducerFactory());
}
@Bean
public ConsumerFactory<String,Object> consumerFactory() {
Map<String,Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONfig,bootstrapAddress);
props.put(ConsumerConfig.GROUP_ID_CONfig,"tp-sale.reply");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONfig,StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONfig,ObjectFactoryDeserializer.class);
return new DefaultKafkaConsumerFactory<>(props);
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String,Object> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String,Object> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
return factory;
}
@Bean
public ReplyingKafkaTemplate<String,Object,Object> replyKafkaTemplate(ProducerFactory<String,Object> producerFactory,ConcurrentKafkaListenerContainerFactory<String,Object> factory) {
ConcurrentMessageListenerContainer<String,Object> kafkaMessageListenerContainer = factory.createContainer("tp-sale.reply");
ReplyingKafkaTemplate<String,Object> requestReplyKafkaTemplate = new ReplyingKafkaTemplate<>(producerFactory,kafkaMessageListenerContainer);
requestReplyKafkaTemplate.setDefaultTopic("tp-sale.reply");
return requestReplyKafkaTemplate;
}
}
制作人:
@RestController
@RequestMapping("/checkout")
public class CheckoutController {
private static final Logger LOG = LoggerFactory.getLogger(CheckoutController.class);
private KafkaTemplate<String,Object> requestFactoryKafkaTemplate;
private ReplyingKafkaTemplate<String,Object> requestReplyKafkaTemplate;
@Autowired
public CheckoutController(KafkaTemplate<String,Object> requestFactoryKafkaTemplate,ReplyingKafkaTemplate<String,Object> requestReplyKafkaTemplate){
this.requestFactoryKafkaTemplate = requestFactoryKafkaTemplate;
this.requestReplyKafkaTemplate = requestReplyKafkaTemplate;
}
@PostMapping("sale_test")
public void performSaletest() throws ExecutionException,InterruptedException,TimeoutException {
SaleRequestFactory obj = new SaleRequestFactory();
obj.setId(100);
ProducerRecord<String,Object> record = new ProducerRecord<>("tp-sale.request",obj);
RequestReplyFuture<String,Object> replyFuture = requestReplyKafkaTemplate.sendAndReceive(record);
SendResult<String,Object> sendResult = replyFuture.getSendFuture().get(10,TimeUnit.SECONDS);
ConsumerRecord<String,Object> consumerRecord = replyFuture.get(10,TimeUnit.SECONDS);
SaleResponseFactory value = (SaleResponseFactory) consumerRecord.value();
System.out.println("!!!!!!!!!!!! " + value.getUnique_id());
}
@PostMapping("authorize_test")
public void performAuthtest() throws ExecutionException,TimeoutException {
AuthRequestFactory obj = new AuthRequestFactory();
obj.setId(140);
ProducerRecord<String,TimeUnit.SECONDS);
AuthResponseFactory value = (AuthResponseFactory) consumerRecord.value();
System.out.println("!!!!!!!!!!!! " + value.getUnique_id());
}
}
ObjectFactoryDeserializer
public class ObjectFactoryDeserializer implements Deserializer<Object> {
@Override
public Object deserialize(String topic,byte[] data) {
return null;
}
@Override
public Object deserialize(String topic,Headers headers,byte[] data) {
ByteArrayInputStream bais = new ByteArrayInputStream(data);
try (ObjectInputStream ois = new ObjectInputStream(bais)) {
return ois.readobject();
}
catch (IOException e) {
throw new UncheckedioException(e);
}
catch (ClassNotFoundException e) {
throw new IllegalStateException(e);
}
}
}
ObjectFactorySerializer
public class ObjectFactorySerializer implements Serializer<Object> {
@Override
public byte[] serialize(String topic,Object data) {
return null;
}
@Override
public byte[] serialize(String topic,Object data) {
ByteArrayOutputStream baos = new ByteArrayOutputStream();
try (ObjectOutputStream oos = new ObjectOutputStream(baos)) {
oos.writeObject(data);
return baos.toByteArray();
}
catch (IOException e) {
throw new UncheckedioException(e);
}
}
}
消费者配置:
@EnableKafka
@Configuration
public class KafkaConsumerConfig {
@Value(value = "${kafka.bootstrapAddress}")
private String bootstrapAddress;
@Bean
public ConsumerFactory<String,"tp-sale.request");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONfig,ObjectFactoryDeserializer.class);
return new DefaultKafkaConsumerFactory<>(props);
}
@Bean
public ProducerFactory<String,Object> saleResponseFactoryProducerFactory() {
Map<String,ObjectFactorySerializer.class);
return new DefaultKafkaProducerFactory<>(configProps);
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String,Object> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.setReplyTemplate(saleResponseFactoryKafkaTemplate());
return factory;
}
@Bean
public KafkaTemplate<String,Object> saleResponseFactoryKafkaTemplate() {
return new KafkaTemplate<>(saleResponseFactoryProducerFactory());
}
}
消费者
@Component
@KafkaListener(id = "tp-sale.request",topics = "tp-sale.request")
public class ConsumerListener {
private static final Logger LOGGER = LoggerFactory.getLogger(ConsumerListener.class);
@KafkaHandler
@SendTo("tp-sale.reply")
public AuthResponseFactory fooListener(AuthRequestFactory authRequestFactory) {
System.out.println("In AuthRequestFactoryListener: " + authRequestFactory);
AuthResponseFactory resObj = new AuthResponseFactory();
resObj.setUnique_id("123123");
return resObj;
}
@KafkaHandler
@SendTo("tp-sale.reply")
public SaleResponseFactory barListener(SaleRequestFactory saleRequestFactory) {
System.out.println("In SaleRequestFactoryListener: " + saleRequestFactory);
SaleResponseFactory resObj = new SaleResponseFactory();
resObj.setUnique_id("123123");
return resObj;
}
}
完全最小工作example
当我到达端点 authorize_test
时,代码运行良好。
当我到达端点 sale_test
时,我收到此异常:
生产者例外:
14:06:48.706 [consumer-0-C-1] DEBUG KafkaMessageListenerContainer$ListenerConsumer[debug:313] - Commit list: {}
14:06:48.706 [consumer-0-C-1] DEBUG RecoveringBatchErrorHandler[debug:200] - Expected a BatchListenerFailedException; re-seeking batch
org.apache.kafka.common.errors.SerializationException: Error deserializing key/value for partition tp-sale.reply-0 at offset 3. If needed,please seek past the record to continue consumption.
Caused by: java.lang.classCastException: class org.engine.plugin.transactions.factory.SaleResponseFactory cannot be cast to class org.engine.plugin.transactions.factory.AuthResponseFactory (org.engine.plugin.transactions.factory.SaleResponseFactory and org.engine.plugin.transactions.factory.AuthResponseFactory are in unnamed module of loader org.springframework.boot.loader.Launchedurlclassloader @6267c3bb)
at org.engine.plugin.transactions.factory.ResponseFactoryDeserializer.deserialize(ResponseFactoryDeserializer.java:20)
at org.engine.plugin.transactions.factory.ResponseFactoryDeserializer.deserialize(ResponseFactoryDeserializer.java:10)
at org.apache.kafka.common.serialization.Deserializer.deserialize(Deserializer.java:60)
at org.apache.kafka.clients.consumer.internals.Fetcher.parseRecord(Fetcher.java:1324)
at org.apache.kafka.clients.consumer.internals.Fetcher.access$3400(Fetcher.java:129)
at org.apache.kafka.clients.consumer.internals.Fetcher$CompletedFetch.fetchRecords(Fetcher.java:1555)
at org.apache.kafka.clients.consumer.internals.Fetcher$CompletedFetch.access$1700(Fetcher.java:1391)
at org.apache.kafka.clients.consumer.internals.Fetcher.fetchRecords(Fetcher.java:683)
at org.apache.kafka.clients.consumer.internals.Fetcher.fetchedRecords(Fetcher.java:634)
at org.apache.kafka.clients.consumer.KafkaConsumer.pollForFetches(KafkaConsumer.java:1289)
at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1243)
at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1216)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doPoll(KafkaMessageListenerContainer.java:1213)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollAndInvoke(KafkaMessageListenerContainer.java:1117)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:1038)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at java.base/java.lang.Thread.run(Thread.java:835)
14:06:48.707 [consumer-0-C-1] ERROR KafkaMessageListenerContainer$ListenerConsumer[error:149] - Consumer exception
org.springframework.kafka.KafkaException: Seek to current after exception; nested exception is org.apache.kafka.common.errors.SerializationException: Error deserializing key/value for partition tp-sale.reply-0 at offset 3. If needed,please seek past the record to continue consumption.
at org.springframework.kafka.listener.SeekToCurrentBatchErrorHandler.handle(SeekToCurrentBatchErrorHandler.java:72)
at org.springframework.kafka.listener.RecoveringBatchErrorHandler.handle(RecoveringBatchErrorHandler.java:124)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.handleConsumerException(KafkaMessageListenerContainer.java:1372)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:1070)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at java.base/java.lang.Thread.run(Thread.java:835)
Caused by: org.apache.kafka.common.errors.SerializationException: Error deserializing key/value for partition tp-sale.reply-0 at offset 3. If needed,please seek past the record to continue consumption.
Caused by: java.lang.classCastException: class org.engine.plugin.transactions.factory.SaleResponseFactory cannot be cast to class org.engine.plugin.transactions.factory.AuthResponseFactory (org.engine.plugin.transactions.factory.SaleResponseFactory and org.engine.plugin.transactions.factory.AuthResponseFactory are in unnamed module of loader org.springframework.boot.loader.Launchedurlclassloader @6267c3bb)
at org.engine.plugin.transactions.factory.ResponseFactoryDeserializer.deserialize(ResponseFactoryDeserializer.java:20)
at org.engine.plugin.transactions.factory.ResponseFactoryDeserializer.deserialize(ResponseFactoryDeserializer.java:10)
at org.apache.kafka.common.serialization.Deserializer.deserialize(Deseri^Calizer.java:60)
at org.apache.kafka.clients.consumer.internals.Fetcher.parseRecord(Fetcher.java:1324)
at org.apache.kafka.clients.consumer.internals.Fetcher.access$3400(Fetcher.java:129)
at org.apache.kafka.clients.consumer.internals.Fetcher$CompletedFetch.fetchRecords(Fetcher.java:1555)
at org.apache.kafka.clients.consumer.internals.Fetcher$CompletedFetch.access$1700(Fetcher.java:1391)
at org.apache.kafka.clients.consumer.internals.Fetcher.fetchRecords(Fetcher.java:683)
at org.apache.kafka.clients.consumer.internals.Fetcher.fetchedRecords(Fetcher.java:634)
at org.apache.kafka.clients.consumer.KafkaConsumer.pollForFetches(KafkaConsumer.java:1289)
at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1243)
at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1216)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doPoll(KafkaMessageListenerContainer.java:1213)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollAndInvoke(KafkaMessageListenerContainer.java:1117)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:1038)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at java.base/java.lang.Thread.run(Thread.java:835)
14:06:48.970 [http-nio-8090-exec-3] ERROR HandlerExecutionChain[triggerAfterCompletion:192] - handlerinterceptor.afterCompletion threw exception
java.lang.NullPointerException: null
org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:193)
at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:166)
at org.apache.catalina.core.StandardWrapperValve.invoke(StandardWrapperValve.java:202)
at org.apache.catalina.core.StandardContextValve.invoke(StandardContextValve.java:97)
at org.apache.catalina.authenticator.AuthenticatorBase.invoke(AuthenticatorBase.java:542)
14:06:49.142 [http-nio-8090-exec-3] DEBUG dispatcherServlet[logResult:1101] - Failed to complete request: java.lang.InterruptedException
14:06:49.143 [http-nio-8090-exec-3] DEBUG HstsHeaderWriter[writeHeaders:169] - Not injecting HSTS header since it did not match the requestMatcher
14:06:49.149 [consumer-0-C-1] DEBUG RecoveringBatchErrorHandler[debug:200] - Expected a BatchListenerFailedException; re-seeking batch
org.apache.kafka.common.errors.SerializationException: Error deserializing key/value for partition tp-sale.reply-0 at offset 3. If needed,please seek past the record to continue consumption.
Caused by: java.lang.classCastException: class org.engine.plugin.transactions.factory.SaleResponseFactory cannot be cast to class org.engine.plugin.transactions.factory.AuthResponseFactory (org.engine.plugin.transactions.factory.SaleResponseFactory and org.engine.plugin.transactions.factory.AuthResponseFactory are in unnamed module of loader org.springframework.boot.loader.Launchedurlclassloader @6267c3bb)
at org.engine.plugin.transactions.factory.ResponseFactoryDeserializer.deserialize(ResponseFactoryDeserializer.java:20)
at org.engine.plugin.transactions.factory.ResponseFactoryDeserializer.deserialize(ResponseFactoryDeserializer.java:10)
at org.apache.kafka.common.serialization.Deserializer.deserialize(Deserializer.java:60)
at org.apache.kafka.clients.consumer.internals.Fetcher.parseRecord(Fetcher.java:1324)
at org.apache.kafka.clients.consumer.internals.Fetcher.access$3400(Fetcher.java:129)
14:06:49.149 [consumer-0-C-1] ERROR KafkaMessageListenerContainer$ListenerConsumer[error:149] - Consumer exception
org.springframework.kafka.KafkaException: Seek to current after exception; nested exception is org.apache.kafka.common.errors.SerializationException: Error deserializing key/value for partition tp-sale.reply-0 at offset 3. If needed,please seek past the record to continue consumption.
Caused by: java.lang.classCastException: class org.engine.plugin.transactions.factory.SaleResponseFactory cannot be cast to class org.engine.plugin.transactions.factory.AuthResponseFactory (org.engine.plugin.transactions.factory.SaleResponseFactory and org.engine.plugin.transactions.factory.AuthResponseFactory are in unnamed module of loader org.springframework.boot.loader.Launchedurlclassloader @6267c3bb)
at org.engine.plugin.transactions.factory.ResponseFactoryDeserializer.deserialize(ResponseFactoryDeserializer.java:20)
at org.engine.plugin.transactions.factory.ResponseFactoryDeserializer.deserialize(ResponseFactoryDeserializer.java:10)
at org.apache.kafka.common.serialization.Deserializer.deserialize(Deserializer.java:60)
at org.apache.kafka.clients.consumer.internals.Fetcher.parseRecord(Fetcher.java:1324)
at org.apache.kafka.clients.consumer.internals.Fetcher.access$3400(Fetcher.java:129)
at org.apache.kafka.clients.consumer.internals.Fetcher$CompletedFetch.fetchRecords(Fetcher.java:1555)
at org.apache.kafka.clients.consumer.internals.Fetcher$CompletedFetch.access$1700(Fetcher.java:1391)
at org.apache.kafka.clients.consumer.internals.Fetcher.fetchRecords(Fetcher.java:683)
at org.apache.kafka.clients.consumer.internals.Fetcher.fetchedRecords(Fetcher.java:634)
at org.apache.kafka.clients.consumer.KafkaConsumer.pollForFetches(KafkaConsumer.java:1289)
at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1243)
at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1216)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doPoll(KafkaMessageListenerContainer.java:1213)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollAndInvoke(KafkaMessageListenerContainer.java:1117)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:1038)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at java.base/java.lang.Thread.run(Thread.java:835)
14:06:49.150 [consumer-0-C-1] DEBUG KafkaMessageListenerContainer$ListenerConsumer[debug:313] - Commit list: {}
14:06:49.151 [consumer-0-C-1] DEBUG RecoveringBatchErrorHandler[debug:200] - Expected a BatchListenerFailedException; re-seeking batch
org.apache.kafka.common.errors.SerializationException: Error deserializing key/value for partition tp-sale.reply-0 at offset 3. If needed,please seek past the record to continue consumption.
Caused by: java.lang.classCastException: class org.engine.plugin.transactions.factory.SaleResponseFactory cannot be cast to class org.engine.plugin.transactions.factory.AuthResponseFactory (org.engine.plugin.transactions.factory.SaleResponseFactory and org.engine.plugin.transactions.factory.AuthResponseFactory are in unnamed module of loader org.springframework.boot.loader.Launchedurlclassloader @6267c3bb)
at org.engine.plugin.transactions.factory.ResponseFactoryDeserializer.deserialize(ResponseFactoryDeserializer.java:20)
at org.engine.plugin.transactions.factory.ResponseFactoryDeserializer.deserialize(ResponseFactoryDeserializer.java:10)
at org.apache.kafka.common.serialization.Deserializer.deserialize(Deserializer.java:60)
at org.apache.kafka.clients.consumer.internals.Fetcher.parseRecord(Fetcher.java:1324)
at org.apache.kafka.clients.consumer.internals.Fetcher.access$3400(Fetcher.java:129)
at org.apache.kafka.clients.consumer.internals.Fetcher$CompletedFetch.fetchRecords(Fetcher.java:1555)
at org.apache.kafka.clients.consumer.internals.Fetcher$CompletedFetch.access$1700(Fetcher.java:1391)
at org.apache.kafka.clients.consumer.internals.Fetcher.fetchRecords(Fetcher.java:683)
at org.apache.kafka.clients.consumer.internals.Fetcher.fetchedRecords(Fetcher.java:634)
at org.apache.kafka.clients.consumer.KafkaConsumer.pollForFetches(KafkaConsumer.java:1289)
at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1243)
at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1216)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doPoll(KafkaMessageListenerContainer.java:1213)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollAndInvoke(KafkaMessageListenerContainer.java:1117)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:1038)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at java.base/java.lang.Thread.run(Thread.java:835)
14:06:49.152 [consumer-0-C-1] ERROR KafkaMessageListenerContainer$ListenerConsumer[error:149] - Consumer exception
org.springframework.kafka.KafkaException: Seek to current after exception; nested exception is org.apache.kafka.common.errors.SerializationException: Error deserializing key/value for partition tp-sale.reply-0 at offset 3. If needed,please seek past the record to continue consumption.
Caused by: java.lang.classCastException: class org.engine.plugin.transactions.factory.SaleResponseFactory cannot be cast to class org.engine.plugin.transactions.factory.AuthResponseFactory (org.engine.plugin.transactions.factory.SaleResponseFactory and org.engine.plugin.transactions.factory.AuthResponseFactory are in unnamed module of loader org.springframework.boot.loader.Launchedurlclassloader @6267c3bb)
at org.engine.plugin.transactions.factory.ResponseFactoryDeserializer.deserialize(ResponseFactoryDeserializer.java:20)
at org.engine.plugin.transactions.factory.ResponseFactoryDeserializer.deserialize(ResponseFactoryDeserializer.java:10)
at org.apache.kafka.common.serialization.Deserializer.deserialize(Deserializer.java:60)
at org.apache.kafka.clients.consumer.internals.Fetcher.parseRecord(Fetcher.java:1324)
at org.apache.kafka.clients.consumer.internals.Fetcher.access$3400(Fetcher.java:129)
at org.apache.kafka.clients.consumer.internals.Fetcher$CompletedFetch.fetchRecords(Fetcher.java:1555)
at org.apache.kafka.clients.consumer.internals.Fetcher$CompletedFetch.access$1700(Fetcher.java:1391)
at org.apache.kafka.clients.consumer.internals.Fetcher.fetchRecords(Fetcher.java:683)
at org.apache.kafka.clients.consumer.internals.Fetcher.fetchedRecords(Fetcher.java:634)
at org.apache.kafka.clients.consumer.KafkaConsumer.pollForFetches(KafkaConsumer.java:1289)
at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1243)
at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1216)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doPoll(KafkaMessageListenerContainer.java:1213)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollAndInvoke(KafkaMessageListenerContainer.java:1117)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:1038)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at java.base/java.lang.Thread.run(Thread.java:835)
14:06:49.157 [consumer-0-C-1] DEBUG KafkaMessageListenerContainer$ListenerConsumer[debug:313] - Commit list: {}
完整日志 https://pastebin.com/Z5XJCNhA
你知道我错在哪里吗?我找不到我的错误。看起来 requestReplyKafkaTemplate
没有正确配置。
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。