如何解决Kafka用户无法从主题读取AVRO记录 Avro反序列化错误
我在Kafka使用者中使用SpringKafka和Hortonworks架构注册表。我们已经在架构注册表中定义了Avro架构。在使用Avro架构验证记录并将记录发送到主题之后,生产者正在向Kafka主题生成Avro记录。 我正在使用Hortonworks库进行反序列化。
implementation 'com.hortonworks.registries:schema-registry-serdes:0.3.0'
但是这些记录在消费者端并没有反序列化。正在使用的反序列化器是
com.hortonworks.registries.schemaregistry.serdes.avro.kafka.KafkaAvroDeserializer
我得到的错误没有给出任何有关数据错误的提示。就像没有告诉我们无法解析/反序列化任何归档的数据一样。遇到错误。
Caused by: java.lang.RuntimeException: com.google.common.util.concurrent.UncheckedExecutionException: javax.ws.rs.InternalServerErrorException: HTTP 500 Internal Server Error
at com.hortonworks.registries.schemaregistry.client.SchemaRegistryClient.getSchemaVersionInfo(SchemaRegistryClient.java:503)
at com.hortonworks.registries.schemaregistry.serde.AbstractSnapshotDeserializer.deserialize(AbstractSnapshotDeserializer.java:150)
at com.hortonworks.registries.schemaregistry.serdes.avro.kafka.KafkaAvroDeserializer.deserialize(KafkaAvroDeserializer.java:98)
at org.apache.kafka.common.serialization.Deserializer.deserialize(Deserializer.java:60)
at org.apache.kafka.clients.consumer.internals.Fetcher.parseRecord(Fetcher.java:1310)
at org.apache.kafka.clients.consumer.internals.Fetcher.access$3500(Fetcher.java:128)
at org.apache.kafka.clients.consumer.internals.Fetcher$CompletedFetch.fetchRecords(Fetcher.java:1541)
at org.apache.kafka.clients.consumer.internals.Fetcher$CompletedFetch.access$1700(Fetcher.java:1377)
at org.apache.kafka.clients.consumer.internals.Fetcher.fetchRecords(Fetcher.java:677)
at org.apache.kafka.clients.consumer.internals.Fetcher.fetchedRecords(Fetcher.java:632)
at org.apache.kafka.clients.consumer.KafkaConsumer.pollForFetches(KafkaConsumer.java:1290)
at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1248)
at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1216)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doPoll(KafkaMessageListenerContainer.java:1107)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollAndInvoke(KafkaMessageListenerContainer.java:1063)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:988)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.lang.Thread.run(Thread.java:748)
Caused by: com.google.common.util.concurrent.UncheckedExecutionException: javax.ws.rs.InternalServerErrorException: HTTP 500 Internal Server Error
at com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2049)
at com.google.common.cache.LocalCache.get(LocalCache.java:3849)
at com.google.common.cache.LocalCache.getorLoad(LocalCache.java:3873)
at com.google.common.cache.LocalCache$LocalLoadingCache.get(LocalCache.java:4798)
at com.hortonworks.registries.schemaregistry.SchemaVersionInfoCache.getSchema(SchemaVersionInfoCache.java:54)
at com.hortonworks.registries.schemaregistry.client.SchemaRegistryClient.getSchemaVersionInfo(SchemaRegistryClient.java:499)
... 18 more
Caused by: javax.ws.rs.InternalServerErrorException: HTTP 500 Internal Server Error
at org.glassfish.jersey.client.JerseyInvocation.convertToException(JerseyInvocation.java:954)
at org.glassfish.jersey.client.JerseyInvocation.translate(JerseyInvocation.java:739)
at org.glassfish.jersey.client.JerseyInvocation.lambda$invoke$1(JerseyInvocation.java:623)
at org.glassfish.jersey.internal.Errors.process(Errors.java:292)
at org.glassfish.jersey.internal.Errors.process(Errors.java:274)
at org.glassfish.jersey.internal.Errors.process(Errors.java:205)
at org.glassfish.jersey.process.internal.RequestScope.runInScope(RequestScope.java:390)
at org.glassfish.jersey.client.JerseyInvocation.invoke(JerseyInvocation.java:621)
at org.glassfish.jersey.client.JerseyInvocation$Builder.method(JerseyInvocation.java:404)
at org.glassfish.jersey.client.JerseyInvocation$Builder.get(JerseyInvocation.java:300)
at com.hortonworks.registries.schemaregistry.client.SchemaRegistryClient$10.run(SchemaRegistryClient.java:756)
at com.hortonworks.registries.schemaregistry.client.SchemaRegistryClient$10.run(SchemaRegistryClient.java:753)
消费者配置:
Map<String,Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONfig,bootstrapAddress);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONfig,org.apache.kafka.common.serialization.StringDeserialize.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONfig,com.hortonworks.registries.schemaregistry.serdes.avro.kafka.KafkaAvroDeserializer.class);
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONfig,autoCommitFlag);
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONfig,autoOffsetReset);
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONfig,conMaxPollRecords);
props.put(ConsumerConfig.FETCH_MIN_BYTES_CONfig,conMinFetchBytes);
props.put(ConsumerConfig.FETCH_MAX_WAIT_MS_CONfig,conMaxWaitMS);
props.put("schema.registry.url",schemaRegistryUrl);
props.put("specific.avro.reader",true);
这里有两点:
-
从日志看来,Deserializer首先无法从架构注册表中获取架构。但是我们检查了模式是否存在正确的schemaID,该ID用于序列化消息。架构注册表网址正确。
-
以某种方式产生的主题数据可能格式不正确,例如生产者代码中存在错误(生产者的验证和序列化问题),但是在这种情况下,异常跟踪应指向行号代码实际上在进行反序列化,如下面的方法中的某处。下面的代码是com.hortonworks.registries.schemaregistry.serdes.avro.AbstractAvroSnapshotDeserializer的代码段。
受保护的对象buildDeserializedobject(byte protocolId,InputStream payloadInputStream, SchemaMetadata schemaMetadata,Integer writerSchemaVersion,Integer readerSchemaVersion) 引发SerDesException
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。