微信公众号搜"智元新知"关注
微信扫一扫可直接关注哦!

Kafka用户无法从主题读取AVRO记录 Avro反序列化错误

如何解决Kafka用户无法从主题读取AVRO记录 Avro反序列化错误

我在Kafka使用者中使用SpringKafka和Hortonworks架构注册表。我们已经在架构注册表中定义了Avro架构。在使用Avro架构验证记录并将记录发送到主题之后,生产者正在向Kafka主题生成Avro记录。 我正在使用Hortonworks库进行反序列化。

implementation 'com.hortonworks.registries:schema-registry-serdes:0.3.0'

但是这些记录在消费者端并没有反序列化。正在使用的反序列化器是

com.hortonworks.registries.schemaregistry.serdes.avro.kafka.KafkaAvroDeserializer

我得到的错误没有给出任何有关数据错误提示。就像没有告诉我们无法解析/反序列化任何归档的数据一样。遇到错误

Caused by: java.lang.RuntimeException: com.google.common.util.concurrent.UncheckedExecutionException: javax.ws.rs.InternalServerErrorException: HTTP 500 Internal Server Error
        at com.hortonworks.registries.schemaregistry.client.SchemaRegistryClient.getSchemaVersionInfo(SchemaRegistryClient.java:503)
        at com.hortonworks.registries.schemaregistry.serde.AbstractSnapshotDeserializer.deserialize(AbstractSnapshotDeserializer.java:150)
        at com.hortonworks.registries.schemaregistry.serdes.avro.kafka.KafkaAvroDeserializer.deserialize(KafkaAvroDeserializer.java:98)
        at org.apache.kafka.common.serialization.Deserializer.deserialize(Deserializer.java:60)
        at org.apache.kafka.clients.consumer.internals.Fetcher.parseRecord(Fetcher.java:1310)
        at org.apache.kafka.clients.consumer.internals.Fetcher.access$3500(Fetcher.java:128)
        at org.apache.kafka.clients.consumer.internals.Fetcher$CompletedFetch.fetchRecords(Fetcher.java:1541)
        at org.apache.kafka.clients.consumer.internals.Fetcher$CompletedFetch.access$1700(Fetcher.java:1377)
        at org.apache.kafka.clients.consumer.internals.Fetcher.fetchRecords(Fetcher.java:677)
        at org.apache.kafka.clients.consumer.internals.Fetcher.fetchedRecords(Fetcher.java:632)
        at org.apache.kafka.clients.consumer.KafkaConsumer.pollForFetches(KafkaConsumer.java:1290)
        at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1248)
        at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1216)
        at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doPoll(KafkaMessageListenerContainer.java:1107)
        at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollAndInvoke(KafkaMessageListenerContainer.java:1063)
        at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:988)
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
        at java.util.concurrent.FutureTask.run(FutureTask.java:266)
        at java.lang.Thread.run(Thread.java:748)
Caused by: com.google.common.util.concurrent.UncheckedExecutionException: javax.ws.rs.InternalServerErrorException: HTTP 500 Internal Server Error
        at com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2049)
        at com.google.common.cache.LocalCache.get(LocalCache.java:3849)
        at com.google.common.cache.LocalCache.getorLoad(LocalCache.java:3873)
        at com.google.common.cache.LocalCache$LocalLoadingCache.get(LocalCache.java:4798)
        at com.hortonworks.registries.schemaregistry.SchemaVersionInfoCache.getSchema(SchemaVersionInfoCache.java:54)
        at com.hortonworks.registries.schemaregistry.client.SchemaRegistryClient.getSchemaVersionInfo(SchemaRegistryClient.java:499)
        ... 18 more
Caused by: javax.ws.rs.InternalServerErrorException: HTTP 500 Internal Server Error
        at org.glassfish.jersey.client.JerseyInvocation.convertToException(JerseyInvocation.java:954)
        at org.glassfish.jersey.client.JerseyInvocation.translate(JerseyInvocation.java:739)
        at org.glassfish.jersey.client.JerseyInvocation.lambda$invoke$1(JerseyInvocation.java:623)
        at org.glassfish.jersey.internal.Errors.process(Errors.java:292)
        at org.glassfish.jersey.internal.Errors.process(Errors.java:274)
        at org.glassfish.jersey.internal.Errors.process(Errors.java:205)
        at org.glassfish.jersey.process.internal.RequestScope.runInScope(RequestScope.java:390)
        at org.glassfish.jersey.client.JerseyInvocation.invoke(JerseyInvocation.java:621)
        at org.glassfish.jersey.client.JerseyInvocation$Builder.method(JerseyInvocation.java:404)
        at org.glassfish.jersey.client.JerseyInvocation$Builder.get(JerseyInvocation.java:300)
        at com.hortonworks.registries.schemaregistry.client.SchemaRegistryClient$10.run(SchemaRegistryClient.java:756)
        at com.hortonworks.registries.schemaregistry.client.SchemaRegistryClient$10.run(SchemaRegistryClient.java:753)

消费者配置:

Map<String,Object> props = new HashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONfig,bootstrapAddress);
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONfig,org.apache.kafka.common.serialization.StringDeserialize.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONfig,com.hortonworks.registries.schemaregistry.serdes.avro.kafka.KafkaAvroDeserializer.class);
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONfig,autoCommitFlag);
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONfig,autoOffsetReset);
        props.put(ConsumerConfig.MAX_POLL_RECORDS_CONfig,conMaxPollRecords); 
        props.put(ConsumerConfig.FETCH_MIN_BYTES_CONfig,conMinFetchBytes);
        props.put(ConsumerConfig.FETCH_MAX_WAIT_MS_CONfig,conMaxWaitMS);
        props.put("schema.registry.url",schemaRegistryUrl);
        props.put("specific.avro.reader",true);

这里有两点:

  1. 从日志看来,Deserializer首先无法从架构注册表中获取架构。但是我们检查了模式是否存在正确的schemaID,该ID用于序列化消息。架构注册表网址正确。

  2. 以某种方式产生的主题数据可能格式不正确,例如生产者代码中存在错误(生产者的验证和序列化问题),但是在这种情况下,异常跟踪应指向行号代码实际上在进行反序列化,如下面的方法中的某处。下面的代码是com.hortonworks.registries.schemaregistry.serdes.avro.AbstractAvroSnapshotDeserializer的代码段。

    受保护的对象buildDeserializedobject(byte protocolId,InputStream payloadInputStream, SchemaMetadata schemaMetadata,Integer writerSchemaVersion,Integer readerSchemaVersion) 引发SerDesException

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。

相关推荐


Selenium Web驱动程序和Java。元素在(x,y)点处不可单击。其他元素将获得点击?
Python-如何使用点“。” 访问字典成员?
Java 字符串是不可变的。到底是什么意思?
Java中的“ final”关键字如何工作?(我仍然可以修改对象。)
“loop:”在Java代码中。这是什么,为什么要编译?
java.lang.ClassNotFoundException:sun.jdbc.odbc.JdbcOdbcDriver发生异常。为什么?
这是用Java进行XML解析的最佳库。
Java的PriorityQueue的内置迭代器不会以任何特定顺序遍历数据结构。为什么?
如何在Java中聆听按键时移动图像。
Java“Program to an interface”。这是什么意思?