如何解决将矢量化列转换为 .tfrecord 文件的问题
我在将 pyspark 数据帧转换为 .tfrecord 文件时遇到问题,原因是通过 pySpark 中的 VectorAssembler 稀疏列。我正在考虑将 VectorUDT 列重新转换为字符串左右,但我希望它以更稳定的方式作为 TF 输入可读。 VectorType 实际上是支持的,但 VectorUDT 不是。这是转换试验后的错误输出:
Py4JJavaError:调用 o137.save 时发生错误。 : org.apache.spark.SparkException:作业中止。在 org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:231) 在 org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand.run(InsertIntoHadoopFsRelationCommand.scala:188) 在 org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult$lzycompute(commands.scala:108) 在 org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult(commands.scala:106) 在 org.apache.spark.sql.execution.command.DataWritingCommandExec.doExecute(commands.scala:131) 在 org.apache.spark.sql.execution.SparkPlan.$anonfun$execute$1(SparkPlan.scala:180) 在 org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:218) 在 org.apache.spark.rdd.RDDOOperationScope$.withScope(RDDOOperationScope.scala:151) 在 org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:215) 在 org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:176) 在 org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:132) 在 org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:131) 在 org.apache.spark.sql.DataFrameWriter.$anonfun$runcommand$1(DataFrameWriter.scala:989) 在 org.apache.spark.sql.execution.sqlExecution$.$anonfun$withNewExecutionId$5(sqlExecution.scala:103) 在 org.apache.spark.sql.execution.sqlExecution$.withsqlConfPropagated(sqlExecution.scala:163) 在 org.apache.spark.sql.execution.sqlExecution$.$anonfun$withNewExecutionId$1(sqlExecution.scala:90) 在 org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775) 在 org.apache.spark.sql.execution.sqlExecution$.withNewExecutionId(sqlExecution.scala:64) 在 org.apache.spark.sql.DataFrameWriter.runcommand(DataFrameWriter.scala:989) 在 org.apache.spark.sql.DataFrameWriter.savetoV1Source(DataFrameWriter.scala:438) 在 org.apache.spark.sql.DataFrameWriter.saveInternal(DataFrameWriter.scala:415) 在 org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:293) 在 java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native 方法)在 java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) 在 java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) 在 java.base/java.lang.reflect.Method.invoke(Method.java:566) 在 py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244) 在 py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357) 在 py4j.Gateway.invoke(Gateway.java:282) 在 py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132) 在 py4j.commands.CallCommand.execute(CallCommand.java:79) 在 py4j.GatewayConnection.run(GatewayConnection.java:238) 在 java.base/java.lang.Thread.run(Thread.java:829) 引起的: org.apache.spark.SparkException:由于阶段失败,作业中止: 阶段 13.0 中的任务 0 失败 1 次,最近失败:丢失任务 0.0 在阶段 13.0 (TID 18) (f2a1abf16740 executor driver): java.lang.RuntimeException: 无法将字段转换为不受支持的数据 输入 org.apache.spark.ml.linalg.VectorUDT@3bfc3ba7 在 com.linkedin.spark.datasources.tfrecord.TFRecordSerializer.newFeatureConverter(TFRecordSerializer.scala:147) 在 com.linkedin.spark.datasources.tfrecord.TFRecordSerializer.$anonfun$featureConverters$2(TFRecordSerializer.scala:14) 在 scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:238) 在 scala.collection.immutable.List.foreach(List.scala:392) 在 scala.collection.TraversableLike.map(TraversableLike.scala:238) at scala.collection.TraversableLike.map$(TraversableLike.scala:231) 在 scala.collection.immutable.List.map(List.scala:298) 在 com.linkedin.spark.datasources.tfrecord.TFRecordSerializer.(TFRecordSerializer.scala:14) 在 com.linkedin.spark.datasources.tfrecord.TFRecordOutputWriter.(TFRecordOutputWriter.scala:24) 在 com.linkedin.spark.datasources.tfrecord.DefaultSource$$anon$1.newInstance(DefaultSource.scala:79) 在 org.apache.spark.sql.execution.datasources.SingleDirectoryDataWriter.newOutputWriter(FileFormatDataWriter.scala:126) 在 org.apache.spark.sql.execution.datasources.SingleDirectoryDataWriter.(FileFormatDataWriter.scala:111) 在 org.apache.spark.sql.execution.datasources.FileFormatWriter$.executeTask(FileFormatWriter.scala:269) 在 org.apache.spark.sql.execution.datasources.FileFormatWriter$.$anonfun$write$15(FileFormatWriter.scala:210) 在 org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90) 在 org.apache.spark.scheduler.Task.run(Task.scala:131) 在 org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:497) 在 org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1439) 在 org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:500) 在 java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) 在 java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) 在 java.base/java.lang.Thread.run(Thread.java:829)
驱动程序堆栈跟踪:在 org.apache.spark.scheduler.DAGScheduler.failJobAndindependentStages(DAGScheduler.scala:2258) 在 org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2207) 在 org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2206) 在 scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62) 在 scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55) 在 scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49) 在 org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2206) 在 org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1079) 在 org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1079) 在 scala.Option.foreach(Option.scala:407) 在 org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1079) 在 org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2445) 在 org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2387) 在 org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2376) 在 org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49) 在 org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:868) 在 org.apache.spark.SparkContext.runJob(SparkContext.scala:2196) 在 org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:200) ... 32 更多 引起:java.lang.RuntimeException:无法转换 字段到不受支持的数据类型 org.apache.spark.ml.linalg.VectorUDT@3bfc3ba7 在 com.linkedin.spark.datasources.tfrecord.TFRecordSerializer.newFeatureConverter(TFRecordSerializer.scala:147) 在 com.linkedin.spark.datasources.tfrecord.TFRecordSerializer.$anonfun$featureConverters$2(TFRecordSerializer.scala:14) 在 scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:238) 在 scala.collection.immutable.List.foreach(List.scala:392) 在 scala.collection.TraversableLike.map(TraversableLike.scala:238) at scala.collection.TraversableLike.map$(TraversableLike.scala:231) 在 scala.collection.immutable.List.map(List.scala:298) 在 com.linkedin.spark.datasources.tfrecord.TFRecordSerializer.(TFRecordSerializer.scala:14) 在 com.linkedin.spark.datasources.tfrecord.TFRecordOutputWriter.(TFRecordOutputWriter.scala:24) 在 com.linkedin.spark.datasources.tfrecord.DefaultSource$$anon$1.newInstance(DefaultSource.scala:79) 在 org.apache.spark.sql.execution.datasources.SingleDirectoryDataWriter.newOutputWriter(FileFormatDataWriter.scala:126) 在 org.apache.spark.sql.execution.datasources.SingleDirectoryDataWriter.(FileFormatDataWriter.scala:111) 在 org.apache.spark.sql.execution.datasources.FileFormatWriter$.executeTask(FileFormatWriter.scala:269) 在 org.apache.spark.sql.execution.datasources.FileFormatWriter$.$anonfun$write$15(FileFormatWriter.scala:210) 在 org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90) 在 org.apache.spark.scheduler.Task.run(Task.scala:131) 在 org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:497) 在 org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1439) 在 org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:500) 在 java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) 在 java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) ... 1个
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。