如何解决任务不可序列化:java.io.NotSerializableException 仅在类而非对象上调用闭包外的函数时
RDD 扩展了 Serialisable 接口,因此这不是导致您的任务失败的原因。现在这并不意味着您可以RDD
使用 Spark 序列化并避免NotSerializableException
Spark是一个分布式计算引擎,它的主要抽象是一个弹性分布式数据集(),可以看作是一个分布式集合。基本上,RDD 的元素在集群的节点上进行分区,但 Spark 将其从用户那里抽象出来,让用户与 RDD(集合)进行交互,就好像它是本地的一样。
不涉及太多细节,但是当您在 RDD( 、 和其他)上运行不同的转换时map
,flatMap
您filter
的转换代码(闭包)是:
- 在驱动节点上序列化,
- 运送到集群中的适当节点,
- 反序列化,
- 最后在节点上执行
您当然可以在本地运行它(如您的示例中所示),但所有这些阶段(除了通过网络发送)仍然会发生。[这让您甚至可以在部署到生产环境之前发现任何错误]
在您的第二种情况下发生的是您正在调用一个方法,该方法testing
在 map 函数内部的类中定义。Spark 看到了这一点,并且由于方法无法自行序列化,Spark 尝试序列化整个 testing
类,以便代码在另一个 JVM 中执行时仍然可以工作。你有两种可能:
要么你让类测试可序列化,所以整个类都可以被 Spark 序列化:
import org.apache.spark.{SparkContext,SparkConf}
object Spark {
val ctx = new SparkContext(new SparkConf().setAppName("test").setMaster("local[*]"))
}
object NOTworking extends App {
new test().doIT
}
class Test extends java.io.Serializable {
val rddList = Spark.ctx.parallelize(List(1,2,3))
def doIT() = {
val after = rddList.map(someFunc)
after.collect().foreach(println)
}
def someFunc(a: Int) = a + 1
}
或者你创建someFunc
函数而不是方法(函数是 Scala 中的对象),以便 Spark 能够序列化它:
import org.apache.spark.{SparkContext,SparkConf}
object Spark {
val ctx = new SparkContext(new SparkConf().setAppName("test").setMaster("local[*]"))
}
object NOTworking extends App {
new test().doIT
}
class Test {
val rddList = Spark.ctx.parallelize(List(1,2,3))
def doIT() = {
val after = rddList.map(someFunc)
after.collect().foreach(println)
}
val someFunc = (a: Int) => a + 1
}
您可能会对类序列化的类似但不同的问题感兴趣,您可以在此 Spark 峰会 2013 演示文稿中阅读它。
作为旁注,您可以重写rddList.map(someFunc(_))
为rddList.map(someFunc)
,它们完全相同。通常,第二个是首选,因为它不那么冗长且易于阅读。
编辑(2015-03-15):SPARK-5307引入了,Spark 1.3.0 是第一个使用它的版本。它将序列化路径添加到NotSerializableException。当遇到 NotSerializableException 时,调试器访问对象图以查找无法序列化的对象的路径,并构造信息以帮助用户查找对象。
Serialization stack:
- object not serializable (class: testing, value: testing@2dfe2f00)
- field (class: testing$$anonfun$1, name: $outer, type: class testing)
- object (class testing$$anonfun$1, <function1>)
解决方法
在闭包之外调用函数时出现奇怪的行为:
- 当函数在对象中时一切正常
- 当函数在一个类中时:
任务不可序列化:java.io.NotSerializableException:测试
问题是我需要我的代码在一个类而不是一个对象中。知道为什么会这样吗?Scala 对象是否序列化(默认?)?
这是一个工作代码示例:
object working extends App {
val list = List(1,2,3)
val rddList = Spark.ctx.parallelize(list)
//calling function outside closure
val after = rddList.map(someFunc(_))
def someFunc(a:Int) = a+1
after.collect().map(println(_))
}
这是非工作示例:
object NOTworking extends App {
new testing().doIT
}
//adding extends Serializable wont help
class testing {
val list = List(1,3)
val rddList = Spark.ctx.parallelize(list)
def doIT = {
//again calling the fucntion someFunc
val after = rddList.map(someFunc(_))
//this will crash (spark lazy)
after.collect().map(println(_))
}
def someFunc(a:Int) = a+1
}
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。