如何解决scala spark rdd 错误:无法将 java.lang.invoke.SerializedLambda 的实例分配给字段 org.apache.spark.rdd.MapPartitionsRDD ]
火花版本:3.0.1 Scala 版本:2.12.10
由于我不是生活在英语世界,如果语法不正确,请谅解。
我是 Scala 和 Spark 的初学者。
我想绘制数据分布的直方图,如下图所示。
所以,我想通过将数据分成 10 个部分并计算开始范围、结束范围和数量来创建一个数据集。
(1) SparkSession 连接代码
object SparkResource {
implicit val oSparkOrigin = SparkSession.builder()
.master("spark://centos-master:7077")
// .master("local[*]")
.appName("spark-api")
.getorCreate()
implicit val oSpark = SparkSession.active
}
def handleDataFromHDFS(
pSpark: SparkSession,pFilename: String,pFiletype: String,pFileseq: String,pUserno: String,pGroupId: String,pJobId: String)(implicit mc: MarkerContext): Future[ResponseResource] = {
require(pFiletype != null,"filetype must be not null.")
require(pFileseq != null,"pFileseq must be not null.")
apiService.serviceDataFromHDFS(pSpark,pFilename,pFiletype,pFileseq,pUserno).map { sData =>
// pSpark == SparkResource.oSpark (SparkSession)
// sData == Dataset[Row]
// table name
val sName = pUserno + "_" + pGroupId + "_" + pJobId
// Temp View Create
sData.createOrReplaceTempView(sName)
// cache
pSpark.sqlContext.cacheTable(sName)
sData.show()
// +----------+------+----------+----------+----------+----------+---------+
// | date|symbol| open| close| low| high| volume|
// +----------+------+----------+----------+----------+----------+---------+
// |2016-01-05| WLTW| 123.43|125.839996|122.309998| 126.25|2163600.0|
// |2016-01-06| WLTW|125.239998|119.980003|119.940002|125.540001|2386400.0|
// |2016-01-07| WLTW|116.379997|114.949997| 114.93|119.739998|2489500.0|
// |2016-01-08| WLTW|115.480003|116.620003| 113.5|117.440002|2006300.0|
// |2016-01-11| WLTW|117.010002|114.970001|114.089996|117.330002|1408600.0|
// |2016-01-12| WLTW|115.510002|115.550003| 114.5|116.059998|1098000.0|
// |2016-01-13| WLTW|116.459999|112.849998|112.589996| 117.07| 949600.0|
// |2016-01-14| WLTW|113.510002|114.379997|110.050003|115.029999| 785300.0|
// |2016-01-15| WLTW|113.330002|112.529999|111.919998|114.879997|1093700.0|
// |2016-01-19| WLTW|113.660004|110.379997|109.870003|115.870003|1523500.0|
// |2016-01-20| WLTW|109.059998|109.300003| 108.32|111.599998|1653900.0|
// |2016-01-21| WLTW|109.730003| 110.0| 108.32|110.580002| 944300.0|
// |2016-01-22| WLTW|111.879997|111.949997|110.190002|112.949997| 744900.0|
// |2016-01-25| WLTW| 111.32|110.120003| 110.0|114.629997| 703800.0|
// |2016-01-26| WLTW|110.419998| 111.0|107.300003|111.400002| 563100.0|
// |2016-01-27| WLTW|110.769997|110.709999|109.019997| 112.57| 896100.0|
// |2016-01-28| WLTW|110.900002|112.580002|109.900002|112.970001| 680400.0|
// |2016-01-29| WLTW|113.349998|114.470001|111.669998|114.589996| 749900.0|
// |2016-02-01| WLTW| 114.0| 114.5|112.900002|114.849998| 574200.0|
// |2016-02-02| WLTW| 113.25|110.559998| 109.75|113.860001| 694800.0|
// +----------+------+----------+----------+----------+----------+---------+
import org.apache.spark.sql.functions.{col,column,expr}
import pSpark.implicits._
var sMinMax_df = sData.agg(max($"open"),min($"open")).head()
var sMaxValue = sMinMax_df(0).toString
var sMinValue = sMinMax_df(1).toString
println("=sMaxValue=")
println(sMaxValue)
println("=sMinValue=")
println(sMinValue)
val thresholds: Array[Double] = (((sMinValue.todouble until sMaxValue.todouble by (sMaxValue.todouble - sMinValue.todouble)/10).toArray ++ Array(sMaxValue.todouble)).map(_.todouble))
thresholds.foreach(x => println(x))
// 1.66
// 159.9379941
// 318.2159882
// 476.4939823
// 634.7719764
// 793.0499705
// 951.3279646
// 1109.6059587000002
// 1267.8839528
// 1426.1619469
// 1584.439941
// // Convert DataFrame to RDD and calculate histogram values
// error-occurring code here
val _tmpHist = sData
.select($"open" cast "double")
.rdd.map(r => r.getDouble(0))
.histogram(thresholds)
// // Result DataFrame contains `from`,`to` range and the `value`.
val histogram = pSpark.sparkContext.parallelize((thresholds,thresholds.tail,_tmpHist).zipped.toList).toDF("from","to","value")
// histogram.show()
// +------------------+------------------+------+
// | from| to| value|
// +------------------+------------------+------+
// | 1.66| 159.9379941|811608|
// | 159.9379941| 318.2159882| 28881|
// | 318.2159882| 476.4939823| 4959|
// | 476.4939823| 634.7719764| 2883|
// | 634.7719764| 793.0499705| 1834|
// | 793.0499705| 951.3279646| 257|
// | 951.3279646|1109.6059587000002| 120|
// |1109.6059587000002| 1267.8839528| 396|
// | 1267.8839528| 1426.1619469| 237|
// | 1426.1619469| 1584.439941| 89|
// +------------------+------------------+------+
ResponseResource("select",Json.toJson(sData.limit(20).toJSON.collect()),Json.parse(sData.schema.json)("fields"),sName,sData.count(),0)
}
}
(3) sbt
name := """spark-api-test"""
organization := "com.baeldung"
version := "1.0-SNAPSHOT"
lazy val root = (project in file(".")).enablePlugins(PlayScala)
scalaVersion := "2.12.10"
resolvers += "jitpack" at "https://jitpack.io"
libraryDependencies += guice
libraryDependencies += "org.scalatestplus.play" %% "scalatestplus-play" % "5.0.0" % Test
libraryDependencies += "MysqL" % "mysql-connector-java" % "5.1.41"
libraryDependencies += "org.apache.spark" %% "spark-sql" % "3.0.1"
libraryDependencies += "org.apache.spark" %% "spark-core" % "3.0.1"
libraryDependencies += "org.apache.spark" %% "spark-mllib" % "3.0.1"
libraryDependencies += "org.joda" % "joda-convert" % "2.2.1"
libraryDependencies += "net.logstash.logback" % "logstash-logback-encoder" % "6.2"
libraryDependencies += "io.lemonlabs" %% "scala-uri" % "1.5.1"
libraryDependencies += "net.codingwell" %% "scala-guice" % "4.2.6"
libraryDependencies += "com.crealytics" %% "spark-excel" % "0.13.6"
libraryDependencies += "com.github.shin285" % "KOMORAN" % "3.3.4"
// hdfs save lib
libraryDependencies += "com.github.mrpowers" %% "spark-daria" % "1.0.0"
// https://mvnrepository.com/artifact/org.scala-lang/scala-compiler
libraryDependencies += "org.scala-lang" % "scala-compiler" % "2.12.10"
addCompilerPlugin("com.github.aoiroaoino" %% "totuple" % "0.1.2")
enablePlugins(JavaAppPackaging)
val _tmpHist = sData
.select($"open" cast "double")
.rdd.map(r => r.getDouble(0))
.histogram(thresholds)
(5) 异常
Job aborted due to stage failure: Task 0 in stage 5.0 Failed 4 times,most recent failure: Lost task 0.3 in stage 5.0 (TID 12,192.168.0.220,executor 0): java.lang.classCastException: cannot assign instance of java.lang.invoke.SerializedLambda to field org.apache.spark.rdd.MapPartitionsRDD.f of type scala.Function3 in instance of org.apache.spark.rdd.MapPartitionsRDD
at java.io.ObjectStreamClass$FieldReflector.setobjFieldValues(ObjectStreamClass.java:2301)
at java.io.ObjectStreamClass.setobjFieldValues(ObjectStreamClass.java:1431)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2410)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2328)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2186)
at java.io.ObjectInputStream.readobject0(ObjectInputStream.java:1666)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2404)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2328)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2186)
at java.io.ObjectInputStream.readobject0(ObjectInputStream.java:1666)
at java.io.ObjectInputStream.readobject(ObjectInputStream.java:502)
at java.io.ObjectInputStream.readobject(ObjectInputStream.java:460)
at scala.collection.immutable.List$SerializationProxy.readobject(List.scala:488)
at sun.reflect.GeneratedMethodAccessor9.invoke(UnkNown Source)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at java.io.ObjectStreamClass.invokeReadobject(ObjectStreamClass.java:1184)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2295)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2186)
at java.io.ObjectInputStream.readobject0(ObjectInputStream.java:1666)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2404)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2328)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2186)
at java.io.ObjectInputStream.readobject0(ObjectInputStream.java:1666)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2404)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2328)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2186)
at java.io.ObjectInputStream.readobject0(ObjectInputStream.java:1666)
at java.io.ObjectInputStream.readobject(ObjectInputStream.java:502)
at java.io.ObjectInputStream.readobject(ObjectInputStream.java:460)
at org.apache.spark.serializer.JavaDeserializationStream.readobject(JavaSerializer.scala:76)
at org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:115)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:83)
at org.apache.spark.scheduler.Task.run(Task.scala:127)
at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:446)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1377)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:449)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Driver stacktrace:
如果sparksession中master设置为local[*],则正常工作。
implicit val oSparkOrigin = SparkSession.builder()
.master("local[*]")
.appName("spark-api")
.getorCreate()
但我必须使用 Spark 主 URL。
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。