微信公众号搜"智元新知"关注
微信扫一扫可直接关注哦!

反序列化到 OffsetDateTime 时,如何使 Spring Kafka JsonDeserializer 保留时区偏移量

如何解决反序列化到 OffsetDateTime 时,如何使 Spring Kafka JsonDeserializer 保留时区偏移量

我通过 Kafka 收到一条消息,我知道其中包含非 UTC 时区。 当我使用 org.apache.kafka.common.serialization.StringDeserializer 来验证这一点时,我得到了带有时区的 ISO 8601 格式的正确时间戳:

{  "id": "e499f2e8-a50e-4ff8-a9fe-0eaf9d3314bf","sent_ts": "2021-02-04T14:06:10+01:00" }

当我切换到 org.springframework.kafka.support.serializer.JsonDeserializer 时,这会丢失。我的 POJO 看起来像这样:

public class MyMessage {

    @JsonProperty("id")
    private String id;

    @JsonProperty("sent_ts")
    private OffsetDateTime sentTs;

    @Override
    public String toString() {
        return "MyMessage{" +
                "id='" + id + '\'' +
                ",sentTs=" + sentTs +
                '}';
}

当我记录收到的消息时,我得到:

MyMessage{id='e499f2e8-a50e-4ff8-a9fe-0eaf9d3314bf',sentTs=2021-02-04T13:06:10Z}

我认为 JsonDeserializer 必须使用 Jackson 所以在我设置的 application.yml 配置中:

spring.jackson:
    deserialization.ADJUST_DATES_TO_CONTEXT_TIME_ZONE: false

这没有用。我也试过定制器:

@Configuration
public class ObjectMapperBuilderCustomizer implements Jackson2ObjectMapperBuilderCustomizer {

    @Override
    public void customize(Jackson2ObjectMapperBuilder builder) {
        builder.modules(new JavaTimeModule());
        builder.featuresTodisable(DeserializationFeature.ADJUST_DATES_TO_CONTEXT_TIME_ZONE);
    }
}

这也不起作用。

我虽然可能需要成为 Kafka 消费者的属性,所以我也尝试过:

spring:
    consumer:
      auto-offset-reset: earliest
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer
      properties:
        spring.jackson.deserialization.ADJUST_DATES_TO_CONTEXT_TIME_ZONE: false

还是不行。

有没有办法让 JsonDeserializer 正常工作并保持正确的时区偏移?

解决方法

当你喜欢这个 value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer 时,该类的实例是由 Apache Kafka 客户端代码创建的,它完全不知道 Spring 配置。

如果您想依赖 Spring Boot 配置的 ObjectMapper 和您的自定义,您应该考虑执行以下操作:

@Bean
DefaultKafkaConsumerFactory kafkaConsumerFactory(KafkaProperties properties,ObjectMapper objectMapper) {
   Map<String,Object> consumerProperties = properties.buildConsumerProperties();
   JsonDeserializer<Object> jsonDeserializer = new JsonDeserializer<>(objectMapper);
   jsonDeserializer.configure(consumerProperties,false);

   return new DefaultKafkaConsumerFactory(consumerProperties,new StringDeserializer(),jsonDeserializer);
}

注意我如何称呼jsonDeserializer.configure(consumerProperties,false);。这样,您仍然可以在 applicaiton.yml 中为 Kafka 消费者配置其余属性。

请考虑为 Spring Boot 提出 GH 问题,因此我们将修改我们如何处理 JsonDeserializer 和自动配置的 ObjectMapper,以提供更好的最终用户体验。

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。