微信公众号搜"智元新知"关注
微信扫一扫可直接关注哦!

如何在 .Net Spark 中的 VectorUdf 中将数组列作为参数传递?

如何解决如何在 .Net Spark 中的 VectorUdf 中将数组列作为参数传递?

我正在尝试在 C# Spark 中实现 Vector Udf。

我按照 Spark .Net 创建了 .Net Spark 环境。 Vector Udf(Apache arrow 和 Microsoft.Data.Analysis 两者)都为我的 IntegerType 列工作。现在,尝试将 Integer 数组类型列发送到 Vector Udf 并找不到实现此目的的方法

用途

using System;
using System.Linq;
using Microsoft.Data.Analysis;
using Microsoft.Spark.sql;
using func = Microsoft.Spark.sql.Functions;
using DataFrame = Microsoft.Spark.sql.DataFrame;
using Arrow = Apache.Arrow;

程序

SparkSession spark = SparkSession
                .Builder()
                .AppName("sample")
                .GetorCreate();

DataFrame dataFrame = spark.Range(0,100).Repartition(4);
            
            Func<Column,Column> array20 = func.Udf<int,int[]>(
                (col1) => Enumerable.Range(0,col1).ToArray());

            dataFrame = dataFrame.WithColumn("array",array20(dataFrame["id"]));

// Apache Arrow
            var arrowVectorUdf = ArrowFunctions.VectorUdf<Arrow.UInt64Array,Arrow.Int64Array>((id) =>
            {
                var int32Array = new Arrow.Int64Array.Builder();
                var count = id.Length;
                foreach (var item in id.Data.Children)
                {
                    int32Array.Append(item.Length + count);
                }
                return int32Array.Build();
            });

// Microsoft.Data.Analysis
            var dataFrameVector = DataFrameFunctions.VectorUdf<Int64DataFrameColumn,Int64DataFrameColumn>((id) => id + id.Length);

工作

            dataFrame = dataFrame.WithColumn("arrowVectorUdfId",arrowVectorUdf(dataFrame["id"]));

            dataFrame = dataFrame.WithColumn("dataFrameVectorId",dataFrameVector(dataFrame["id"]));

不工作

            dataFrame = dataFrame.WithColumn("arrowVectorUdf",arrowVectorUdf(dataFrame["array"]));

            dataFrame = dataFrame.WithColumn("dataFrameVector",dataFrameVector(dataFrame["array"]));

如果我发送“id”列而不是“array”列,Udfs 将起作用。我不确定,“array”列的 Udfs 参数应该是什么类型。上面的代码对 Apache.Arrow 和 Microsoft.Data.Analysis 产生与下面相同的错误

 [2021-03-25T07:02:05.9218517Z] [LAPTOP-0S8GNQ52] [Error] [TaskRunner] [0] Exiting with exception: System.IO.InvalidDataException: Arrow primitive 'List' is unsupported.
   at Apache.Arrow.Ipc.MessageSerializer.GetFieldArrowType(Field field)
   at Apache.Arrow.Ipc.MessageSerializer.GetSchema(Schema schema)
   at Apache.Arrow.Ipc.ArrowStreamReaderImplementation.<ReadSchema>b__12_0(Memory`1 buff)
   at Apache.Arrow.ArrayPoolExtensions.RentReturn(ArrayPool`1 pool,Int32 length,Action`1 action)
   at Apache.Arrow.Ipc.ArrowStreamReaderImplementation.ReadRecordBatch()
   at Microsoft.Spark.Worker.Command.ArrowBasedCommandExecutor.<GetInputIterator>d__2.MoveNext()
   at Microsoft.Spark.Worker.Command.ArrowOrDataFramesqlCommandExecutor.ExecuteDataFramesqlCommand(Stream inputStream,Stream outputStream,sqlCommand[] commands)
   at Microsoft.Spark.Worker.TaskRunner.Processstream(Stream inputStream,Version version,Boolean& readComplete)
   at Microsoft.Spark.Worker.TaskRunner.Run()
[2021-03-25T07:02:05.9245061Z] [LAPTOP-0S8GNQ52] [Info] [TaskRunner] [0] Finished running 0 task(s).
[2021-03-25T07:02:05.9249567Z] [LAPTOP-0S8GNQ52] [Info] [SimpleWorker] RunSimpleWorker() finished successfully
21/03/25 12:32:05 ERROR Executor: Exception in task 0.0 in stage 4.0 (TID 4)
java.lang.IllegalArgumentException
        at java.nio.ByteBuffer.allocate(ByteBuffer.java:334)
        at org.apache.arrow.vector.ipc.message.MessageSerializer.readMessage(MessageSerializer.java:669)
        at org.apache.arrow.vector.ipc.message.MessageChannelReader.readNext(MessageChannelReader.java:58)
        at org.apache.arrow.vector.ipc.ArrowStreamReader.readSchema(ArrowStreamReader.java:163)
        at org.apache.arrow.vector.ipc.ArrowReader.initialize(ArrowReader.java:169)
        at org.apache.arrow.vector.ipc.ArrowReader.ensureInitialized(ArrowReader.java:160)
        at org.apache.arrow.vector.ipc.ArrowReader.getVectorSchemaRoot(ArrowReader.java:62)
        at org.apache.spark.sql.execution.python.PythonArrowOutput$$anon$1.read(PythonArrowOutput.scala:89)
        at org.apache.spark.sql.execution.python.PythonArrowOutput$$anon$1.read(PythonArrowOutput.scala:49)
        at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:456)
        at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
        at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:489)
        at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458)
        at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage3.processNext(UnkNown Source)
        at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
        at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:729)
        at org.apache.spark.sql.execution.SparkPlan.$anonfun$getByteArrayRdd$1(SparkPlan.scala:340)
        at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:872)
        at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:872)
        at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:349)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:313)
        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
        at org.apache.spark.scheduler.Task.run(Task.scala:127)
        at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:444)
        at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1377)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:447)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)
21/03/25 12:32:06 ERROR TaskSetManager: Task 0 in stage 4.0 Failed 1 times; aborting job
21/03/25 12:32:06 ERROR DotnetBackendHandler: Failed to execute 'showString' on 'org.apache.spark.sql.Dataset' with args=([Type=java.lang.Integer,Value: 20],[Type=java.lang.Integer,[Type=java.lang.Boolean,Value: false])

解决方法

它适用于您的两个代码示例。 我和你一样创建了 spark 环境,只是环境不适合我使用 Hadoop 2.7,我单独安装了 Hadoop 2.7.4

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。

相关推荐


Selenium Web驱动程序和Java。元素在(x,y)点处不可单击。其他元素将获得点击?
Python-如何使用点“。” 访问字典成员?
Java 字符串是不可变的。到底是什么意思?
Java中的“ final”关键字如何工作?(我仍然可以修改对象。)
“loop:”在Java代码中。这是什么,为什么要编译?
java.lang.ClassNotFoundException:sun.jdbc.odbc.JdbcOdbcDriver发生异常。为什么?
这是用Java进行XML解析的最佳库。
Java的PriorityQueue的内置迭代器不会以任何特定顺序遍历数据结构。为什么?
如何在Java中聆听按键时移动图像。
Java“Program to an interface”。这是什么意思?