微信公众号搜"智元新知"关注
微信扫一扫可直接关注哦!

Pyspark GCP UnsupportedOperationException:org.apache.parquet.column.values.dictionary.PlainValuesDictionary$PlainDoubleDictionary

如何解决Pyspark GCP UnsupportedOperationException:org.apache.parquet.column.values.dictionary.PlainValuesDictionary$PlainDoubleDictionary

我是 pyspark 的新手,所以希望有人可以提供帮助。我正在尝试读取存储在 GCP 存储桶上的镶木地板文件。该文件按日期分区,例如 bucket-name/year={}/month={}/day={}

对于给定的文件,我们有以下架构描述:

  1. 直到 3 月,我们曾经在 float 数据类型
  2. 中有列 x 和 y
  3. 自 3 月以来,这 2 列现在处于双数据类型

据我所知,pyspark 在评估float 和double 数据类型是否兼容数据类型方面没有问题。 (我在网上找到的类似这个错误的例子与不兼容的数据类型有关,例如字符串和浮点数) 然而,我们正面临这个奇怪的问题,如果我们尝试读取此文件的所有可用数据:

#i.e. read all the data we have ever received for this file
 path = 'bucket-name/year=*/month=*/day=*' 

df = spark.read.format('parquet').load(path)
df.cache().count()

我们得到以下错误。 (请注意,如果我们执行 df.count(),我们不会收到此错误,只有在我们先缓存时才会遇到)

添加到来自 spark.read 的结果架构提到列 x 的数据类型为浮点数。所以模式明智,spark 很乐意读入数据并说 dtype 是浮点数。但是,如果我们缓存,事情就会变糟。

希望情况的细节足够清楚:)

An error occurred while calling o923.count. :
org.apache.spark.SparkException: Job aborted due to stage failure:
Task 15 in stage 41.0 Failed 4 times,most recent failure: Lost task
15.3 in stage 41.0 (TID 13228,avroconversion-validation-w-1.c.vf-gned-nwp-live.internal,executor
47): java.lang.UnsupportedOperationException:
org.apache.parquet.column.values.dictionary.PlainValuesDictionary$PlainDoubleDictionary
    at
org.apache.parquet.column.Dictionary.decodetoFloat(Dictionary.java:53)
    at
org.apache.spark.sql.execution.datasources.parquet.ParquetDictionary.decodetoFloat(ParquetDictionary.java:41)
    at
org.apache.spark.sql.execution.vectorized.OnHeapColumnVector.getFloat(OnHeapColumnVector.java:423)
    at
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(UnkNown
Source)     at
org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
    at
org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$2.hasNext(WholeStageCodegenExec.scala:636)
    at
org.apache.spark.sql.execution.columnar.CachedRDDBuilder$$anon$1.hasNext(InMemoryRelation.scala:125)
    at
org.apache.spark.storage.memory.MemoryStore.putIterator(MemoryStore.scala:221)
    at
org.apache.spark.storage.memory.MemoryStore.putIteratorAsValues(MemoryStore.scala:299)
    at
org.apache.spark.storage.BlockManager.$anonfun$doPutIterator$1(BlockManager.scala:1165)
    at
org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:1091)
    at
org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:1156)
    at
org.apache.spark.storage.BlockManager.getorElseUpdate(BlockManager.scala:882)
    at org.apache.spark.rdd.RDD.getorCompute(RDD.scala:357)     at
org.apache.spark.rdd.RDD.iterator(RDD.scala:308)    at
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:310)     at
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:310)     at
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346)
    at org.apache.spark.rdd.RDD.$anonfun$getorCompute$1(RDD.scala:359)
    at
org.apache.spark.storage.BlockManager.$anonfun$doPutIterator$1(BlockManager.scala:1165)
    at
org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:1091)
    at
org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:1156)
    at
org.apache.spark.storage.BlockManager.getorElseUpdate(BlockManager.scala:882)
    at org.apache.spark.rdd.RDD.getorCompute(RDD.scala:357)     at
org.apache.spark.rdd.RDD.iterator(RDD.scala:308)    at
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:310)     at
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:310)     at
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:310)     at
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:310)     at
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
    at
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:55)
    at org.apache.spark.scheduler.Task.run(Task.scala:123)  at
org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:411)
    at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
    at
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
    at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)

error_snippet

解决方法

根据documentation

cache() 方法是使用默认存储级别的简写, 这是 StorageLevel.MEMORY_ONLY(将反序列化的对象存储在 记忆)

cache() 是一个惰性操作,如果您查看 MEMORY_ONLY 部分,您会注意到 cache() 尝试将 RDD/DataFrame 存储为 JVM 中的反序列化 Java 对象[一旦您调用操作on cached RDD/DataFrame] 所以你在反序列化你的 RDD/DataFrame 中的对象时遇到了问题。 我建议尝试执行一些像 map() 这样的转换来检查序列化/反序列化是否正常

如果您在 df.count() 中调用 df 而没有任何转换,spark 不会反序列化您的对象

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。