微信公众号搜"智元新知"关注
微信扫一扫可直接关注哦!

无法使用Spark.Net UDF和HDInsight群集

如何解决无法使用Spark.Net UDF和HDInsight群集

我试图在prod env中运行一个简单的应用程序,其中包含来自https://github.com/dotnet/spark/blob/master/examples/Microsoft.Spark.CSharp.Examples/Sql/Batch/Basic.cs代码 应用程序运行良好,并向stdout发出输出,直到遇到第一个UDF时此代码崩溃。 感谢您可以分享的任何见解。

Env。 代码是使用

打包的
dotnet publish -c Release -f netcoreapp3.1 -r ubuntu.16.04-x64

HDInsight群集HDI 4.0,Spark 2.4 -服务器已按照https://docs.microsoft.com/en-us/dotnet/spark/tutorials/hdinsight-deployment

中的指南进行设置
spark-submit --master yarn --conf spark.yarn.appMasterEnv.DOTNET_ASSEMBLY_SEARCH_PATHS="./app/publish.zip" --archives wasbs://xxx@yyy.blob.core.windows.net/SparkJobs/publish.zip#mySparkApp --class org.apache.spark.deploy.dotnet.DotnetRunner wasbs://xxx@yyy.blob.core.windows.net/SparkJobs/microsoft-spark-2.4.x-0.12.1.jar wasbs://xxx@yyy.blob.core.windows.net/SparkJobs/publish.zip mySparkApp

(以及在此情况下的各种变体,--deploy-mode集群,各种路径等,等等都无效)

标准输出: ...

+---+-----+
|age| name|
+---+-----+
| 22|Ricky|
| 36| Jeff|
| 62|Geddy|
+---+-----+

[2020-10-28T09:15:10.1478641Z] [wn0-hdinsi] [Error] [JvmBridge] JVM method execution Failed: Nonstatic method 'showString' Failed for class '41' when called with 3 arguments ([Index=1,Type=Int32,Value=20],[Index=2,[Index=3,Type=Boolean,Value=False],)
[2020-10-28T09:15:10.1480587Z] [wn0-hdinsi] [Error] [JvmBridge] org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 16.0 Failed 4 times,most recent failure: Lost task 0.3 in stage 16.0 (TID 210,wn0-hdinsi.xwccrqijnmqujdjghwrza0nzbb.fx.internal.cloudapp.net,executor 2): org.apache.spark.api.python.PythonException: System.NullReferenceException: Object reference not set to an instance of an object.
at Microsoft.Spark.Utils.UdfSerDe.<>c.b__10_0(TypeData td) in //src/csharp/Microsoft.Spark/Utils/UdfSerDe.cs:line 262
at System.Collections.Concurrent.ConcurrentDictionary2.GetorAdd(TKey key,Func2 valueFactory)
at Microsoft.Spark.Utils.UdfSerDe.DeserializeType(TypeData typeData) in //src/csharp/Microsoft.Spark/Utils/UdfSerDe.cs:line 258
at Microsoft.Spark.Utils.UdfSerDe.Deserialize(UdfData udfData) in //src/csharp/Microsoft.Spark/Utils/UdfSerDe.cs:line 160
at Microsoft.Spark.Utils.CommandSerDe.DeserializeUdfs[T](UdfWrapperData data,Int32& nodeIndex,Int32& udfIndex) in //src/csharp/Microsoft.Spark/Utils/CommandSerDe.cs:line 333
at Microsoft.Spark.Utils.CommandSerDe.Deserialize[T](Stream stream,SerializedMode& serializerMode,SerializedMode& deserializerMode,String& runMode) in /_/src/csharp/Microsoft.Spark/Utils/CommandSerDe.cs:line 306
at Microsoft.Spark.Worker.Processor.CommandProcessor.ReadsqlCommands(PythonEvalType evalType,Stream stream) in D:\a\1\s\src\csharp\Microsoft.Spark.Worker\Processor\CommandProcessor.cs:line 188
at Microsoft.Spark.Worker.Processor.CommandProcessor.ReadsqlCommands(PythonEvalType evalType,Stream stream,Version version) in D:\a\1\s\src\csharp\Microsoft.Spark.Worker\Processor\CommandProcessor.cs:line 98
at Microsoft.Spark.Worker.Processor.CommandProcessor.Process(Stream stream) in D:\a\1\s\src\csharp\Microsoft.Spark.Worker\Processor\CommandProcessor.cs:line 43
at Microsoft.Spark.Worker.Processor.PayloadProcessor.Process(Stream stream) in D:\a\1\s\src\csharp\Microsoft.Spark.Worker\Processor\PayloadProcessor.cs:line 82
at Microsoft.Spark.Worker.TaskRunner.Processstream(Stream inputStream,Stream outputStream,Version version,Boolean& readComplete) in D:\a\1\s\src\csharp\Microsoft.Spark.Worker\TaskRunner.cs:line 143
at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:456)
at org.apache.spark.sql.execution.python.PythonUDFRunner$$anon$1.read(PythonUDFRunner.scala:81)
at org.apache.spark.sql.execution.python.PythonUDFRunner$$anon$1.read(PythonUDFRunner.scala:64)
at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:410)
at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage2.processNext(UnkNown Source)
at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$13$$anon$1.hasNext(WholeStageCodegenExec.scala:636)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:255)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:247)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:836)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:836)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
at org.apache.spark.scheduler.Task.run(Task.scala:123)
at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)

------ cut -------- 确实,我的问题确实是路径问题。对于任何其他有相同问题的人,我都可以通过将带有UDF的dll(可以与一般spark应用程序使用相同的dll)工作在“ --files”中来使其工作。因此,基本上,您需要一个带有程序集的zip文件,然后直接链接到dll。可能有一种更聪明的方法,但这对我有用(在群集模式下运行时): 火花提交-部署模式群集-主纱线-文件wasbs://xxx@yyy.blob.core.windows.net/SparkJobs/mySparkApp.dll-类org.apache.spark.deploy.dotnet。 DotnetRunner wasbs://xxx@yyy.blob.core.windows.net/SparkJobs/microsoft-spark-2.4.x-0.12.1.jar wasbs://xxx@yyy.blob.core.windows.net/SparkJobs/ publish.zip mySparkApp

解决方法

错误是因为找不到包含您代码的dll。

两件事,首先是在yarn模式下。在DOTNET_ASSEMBLY_SEARCH_PATHS的开头,导致用户主目录位于路径的前面,因此它不是currentdirectory / app / publish.zip,因此如果不同,则它将在错误的位置查找。

其次,请确保publish.zip不包含文件夹,并且带有udf的dll位于zip的顶层。

我没有将zip放入app文件夹中,而是使用当前文件夹,不用担心DOTNET_ASSEMBLY_SEARCH_PATHS

有关演练,请确保您遵循:

https://docs.microsoft.com/en-us/dotnet/spark/tutorials/hdinsight-deployment

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。

相关推荐


Selenium Web驱动程序和Java。元素在(x,y)点处不可单击。其他元素将获得点击?
Python-如何使用点“。” 访问字典成员?
Java 字符串是不可变的。到底是什么意思?
Java中的“ final”关键字如何工作?(我仍然可以修改对象。)
“loop:”在Java代码中。这是什么,为什么要编译?
java.lang.ClassNotFoundException:sun.jdbc.odbc.JdbcOdbcDriver发生异常。为什么?
这是用Java进行XML解析的最佳库。
Java的PriorityQueue的内置迭代器不会以任何特定顺序遍历数据结构。为什么?
如何在Java中聆听按键时移动图像。
Java“Program to an interface”。这是什么意思?