微信公众号搜"智元新知"关注
微信扫一扫可直接关注哦!

汇合模式注册表在通过火花提交运行时抛出未经授权的异常 401

如何解决汇合模式注册表在通过火花提交运行时抛出未经授权的异常 401

我正在尝试运行一个 spark dstream 作业,该作业试图从 kafka 读取数据并使用融合的 avro 模式注册表反序列化它。我们在创建 kafkaUtils.createDirectStream 时配置了模式注册基本身份验证并传递了配置

val kafkaConfig = Map("basic.auth.credentials.source" -> "URL","key.deserializer" -> classOf[KafkaAvroDeserializer],"value.deserializer" -> classOf[KafkaAvroDeserializer],"bootstrap.servers" -> "localhost:9092","schema.registry.url" -> "http://username:password@localhost:8081")
val stream = KafkaUtils.createDirectStream[String,GenericData.Record](
      ssc,LocationStrategies.PreferConsistent,Subscribe[String,GenericData.Record](topics,kafkaConfig)
    )


当我尝试在 IntelliJ 中运行它时它按预期工作,但是当我尝试使用 spark-submit 命令运行它时,我收到了架构注册未经授权的错误..?有没有人遇到同样的问题?

错误跟踪:

21/02/21 16:03:42 ERROR Executor: Exception in task 0.0 in stage 5.0 (TID 8)
org.apache.kafka.common.errors.SerializationException: Error deserializing key/value for partition topic_name-2 at offset 110. If needed,please seek past the record to continue consumption.
Caused by: org.apache.kafka.common.errors.SerializationException: Error retrieving Avro schema for id 161
Caused by: io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException: Unauthorized; error code: 401
        at io.confluent.kafka.schemaregistry.client.rest.RestService.sendHttpRequest(RestService.java:230)
        at io.confluent.kafka.schemaregistry.client.rest.RestService.httpRequest(RestService.java:256)
        at io.confluent.kafka.schemaregistry.client.rest.RestService.getId(RestService.java:486)
        at io.confluent.kafka.schemaregistry.client.rest.RestService.getId(RestService.java:479)
        at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.getSchemaByIdFromregistry(CachedSchemaRegistryClient.java:177)
        at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.getBySubjectAndId(CachedSchemaRegistryClient.java:256)
        at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.getById(CachedSchemaRegistryClient.java:235)
        at io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer.deserialize(AbstractKafkaAvroDeserializer.java:107)
        at io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer.deserialize(AbstractKafkaAvroDeserializer.java:79)
        at io.confluent.kafka.serializers.KafkaAvroDeserializer.deserialize(KafkaAvroDeserializer.java:55)
        at org.apache.kafka.common.serialization.Deserializer.deserialize(Deserializer.java:60)
        at org.apache.kafka.clients.consumer.internals.Fetcher.parseRecord(Fetcher.java:1261)
        at org.apache.kafka.clients.consumer.internals.Fetcher.access$3600(Fetcher.java:124)
        at org.apache.kafka.clients.consumer.internals.Fetcher$PartitionRecords.fetchRecords(Fetcher.java:1488)
        at org.apache.kafka.clients.consumer.internals.Fetcher$PartitionRecords.access$1600(Fetcher.java:1328)
        at org.apache.kafka.clients.consumer.internals.Fetcher.fetchRecords(Fetcher.java:641)
        at org.apache.kafka.clients.consumer.internals.Fetcher.fetchedRecords(Fetcher.java:602)
        at org.apache.kafka.clients.consumer.KafkaConsumer.pollForFetches(KafkaConsumer.java:1294)
        at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1225)
        at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1159)
        at org.apache.spark.streaming.kafka010.InternalKafkaConsumer.poll(KafkaDataConsumer.scala:206)
        at org.apache.spark.streaming.kafka010.InternalKafkaConsumer.get(KafkaDataConsumer.scala:135)
        at org.apache.spark.streaming.kafka010.KafkaDataConsumer.get(KafkaDataConsumer.scala:39)
        at org.apache.spark.streaming.kafka010.KafkaDataConsumer.get$(KafkaDataConsumer.scala:38)
        at org.apache.spark.streaming.kafka010.KafkaDataConsumer$NonCachedKafkaDataConsumer.get(KafkaDataConsumer.scala:224)
        at org.apache.spark.streaming.kafka010.KafkaRDDIterator.next(KafkaRDD.scala:257)
        at org.apache.spark.streaming.kafka010.KafkaRDDIterator.next(KafkaRDD.scala:225)
        at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:512)
        at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:489)
        at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:511)
        at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:132)
        at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
        at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
        at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:52)
        at org.apache.spark.scheduler.Task.run(Task.scala:127)
        at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:444)
        at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1377)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:447)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)

我使用的是 spark 3.0 和 schema-registry 5.3.1

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。