如何解决KafkaStreams:自定义 JsonSerge
我有一段代码,在SpringBoot应用中实现KafkaStream工作:
public Consumer<KStream<String,ContentModel>> stream() {
return in -> in.groupByKey(Grouped.with(Serdes.String(),new JsonSerde<>(ContentModel.class)))
}
private ContentModel mergeObjects(final ContentModel reducer,final ContentModel materialized) {
return reducer;
}
问题是我的应用程序在其他情况下使用自己的 ObjectMapper bean(例如,使用 json-content 读取文件)。由于 JsonSerge 还包含“内部”自己的 ObjectMapper,因此 2 个 ObjectMapper 的实例开始“喧嚣”。
我正在寻找一种方法来在两侧散布 2 个 objectMapper。
为此,我尝试自定义在 KafkaStream 处理中使用的 JsonSerge:
在配置中:
@Bean
public JsonSerde<ContentModel> jsonSerde() {
ObjectMapper kafkaObjectMapper = new ObjectMapper();
return new JsonSerde<>(ContentModel.class,kafkaObjectMapper);
}
在课堂上:
@Autowired
private JsonSerde<ContentModel> jsonSerde;
public Consumer<KStream<String,jsonSerde))
.reduce(this::mergeObjects,Materialized.as(readMessagesstore.getStoreName()));
}
但是,我收到以下错误:
java.lang.IllegalStateException: KafkaStreams is not running. State is ERROR.
at org.apache.kafka.streams.KafkaStreams.validateIsRunning(KafkaStreams.java:294) ~[kafka-streams-2.3.1.jar!/:?]
at org.apache.kafka.streams.KafkaStreams.store(KafkaStreams.java:1066) ~[kafka-streams-2.3.1.jar!/:?]
我做错了什么?
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。