如何解决Spring-Kafka:反序列化kafka消息时出现问题-类不在“可信任的程序包”中吗?
您可以通过以下方式将软件包列入白名单assessmentAttemptDetailsEntityConsumerFactory()
:
@Bean
public ConsumerFactory<String, AssessmentAttemptDetailsEntity> assessmentAttemptDetailsEntityConsumerFactory() {
JsonDeserializer<AssessmentAttemptDetailsEntity>
deserializer = new JsonDeserializer<>();
deserializer.addTrustedPackages("com.lte.assessment.assessments");//your package
return new DefaultKafkaConsumerFactory(config,deserializer);
}
解决方法
我得到以下异常,因为我从一个项目生产而消费者从另一个项目消费。我怎样才能解决这个问题。显然,包不一样。因此,如何确保有正确的json序列化。
The class 'com.lte.assessment.assessments.AssessmentAttemptRequest' is not in the trusted packages: [java.util,java.lang,com.lte.assessmentanalytics.model
消费者配置
@EnableKafka
@Configuration
public class KafkaConfig {
static Map<String,Object> config = new HashMap();
static {
config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"127.0.0.1:9092");
config.put(ConsumerConfig.GROUP_ID_CONFIG,"group_id");
config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class);
config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,JsonDeserializer.class);
}
@Bean
public ConsumerFactory<String,AssessmentAttemptRequest> assessmentAttemptDetailsEntityConsumerFactory() {
JsonDeserializer<AssessmentAttemptRequest> deserializer = new JsonDeserializer<>();
deserializer.addTrustedPackages("com.lte.assessment.assessments");
return new DefaultKafkaConsumerFactory(config,new StringDeserializer(),deserializer);
}
}
生产者配置
@Configuration
public class KafkaConfiguration {
@Bean
public ProducerFactory producerConfig() {
Map<String,Object> config = new HashMap();
config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"127.0.0.1:9092");
config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,StringSerializer.class);
config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,JsonSerializer.class);
return new DefaultKafkaProducerFactory(config);
}
@Bean
public KafkaTemplate kafkaTemplate() {
return new KafkaTemplate(producerConfig());
}
@Bean
public ConcurrentKafkaListenerContainerFactory aaKafkaListenerFactory() {
ConcurrentKafkaListenerContainerFactory<String,AssessmentAttemptDetailsEntity> factory = new ConcurrentKafkaListenerContainerFactory();
factory.setConsumerFactory(assessmentAttemptDetailsEntityConsumerFactory());
return factory;
}
}
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。