微信公众号搜"智元新知"关注
微信扫一扫可直接关注哦!

任务不可序列化:java.io.NotSerializableException 仅在类而非对象上调用闭包外的函数时

如何解决任务不可序列化:java.io.NotSerializableException 仅在类而非对象上调用闭包外的函数时

RDD 扩展了 Serialisable 接口,因此这不是导致您的任务失败的原因。现在这并不意味着您可以RDD使用 Spark 序列化并避免NotSerializableException

Spark是一个分布式计算引擎,它的主要抽象是一个弹性分布式数据集(),可以看作是一个分布式集合。基本上,RDD 的元素在集群的节点上进行分区,但 Spark 将其从用户那里抽象出来,让用户与 RDD(集合)进行交互,就好像它是本地的一样。

不涉及太多细节,但是当您在 RDD( 、 和其他)上运行不同的转换时mapflatMapfilter的转换代码(闭包)是:

  1. 在驱动节点上序列化,
  2. 运送到集群中的适当节点,
  3. 反序列化,
  4. 最后在节点上执行

您当然可以在本地运行它(如您的示例中所示),但所有这些阶段(除了通过网络发送)仍然会发生。[这让您甚至可以在部署到生产环境之前发现任何错误]

在您的第二种情况下发生的是您正在调用一个方法,该方法testing在 map 函数内部的类中定义。Spark 看到了这一点,并且由于方法无法自行序列化,Spark 尝试序列化整个 testing类,以便代码在另一个 JVM 中执行时仍然可以工作。你有两种可能:

要么你让类测试可序列化,所以整个类都可以被 Spark 序列化:

import org.apache.spark.{SparkContext,SparkConf}

object Spark {
  val ctx = new SparkContext(new SparkConf().setAppName("test").setMaster("local[*]"))
}

object NOTworking extends App {
  new test().doIT
}

class Test extends java.io.Serializable {
  val rddList = Spark.ctx.parallelize(List(1,2,3))

  def doIT() =  {
    val after = rddList.map(someFunc)
    after.collect().foreach(println)
  }

  def someFunc(a: Int) = a + 1
}

或者你创建someFunc函数而不是方法函数是 Scala 中的对象),以便 Spark 能够序列化它:

import org.apache.spark.{SparkContext,SparkConf}

object Spark {
  val ctx = new SparkContext(new SparkConf().setAppName("test").setMaster("local[*]"))
}

object NOTworking extends App {
  new test().doIT
}

class Test {
  val rddList = Spark.ctx.parallelize(List(1,2,3))

  def doIT() =  {
    val after = rddList.map(someFunc)
    after.collect().foreach(println)
  }

  val someFunc = (a: Int) => a + 1
}

您可能会对类序列化的类似但不同的问题感兴趣,您可以在此 Spark 峰会 2013 演示文稿中阅读它。

作为旁注,您可以重写rddList.map(someFunc(_))rddList.map(someFunc),它们完全相同。通常,第二个是首选,因为它不那么冗长且易于阅读。

编辑(2015-03-15):SPARK-5307引入了,Spark 1.3.0 是第一个使用它的版本。它将序列化路径添加NotSerializableException。当遇到 NotSerializableException 时,调试器访问对象图以查找无法序列化的对象的路径,并构造信息以帮助用户查找对象。

在 OP 的情况下,这是打印到标准输出内容

Serialization stack:
    - object not serializable (class: testing, value: testing@2dfe2f00)
    - field (class: testing$$anonfun$1, name: $outer, type: class testing)
    - object (class testing$$anonfun$1, <function1>)

解决方法

在闭包之外调用函数时出现奇怪的行为:

  • 当函数在对象中时一切正常
  • 当函数在一个类中时:

任务不可序列化:java.io.NotSerializableException:测试

问题是我需要我的代码在一个类而不是一个对象中。知道为什么会这样吗?Scala 对象是否序列化(默认?)?

这是一个工作代码示例:

object working extends App {
    val list = List(1,2,3)

    val rddList = Spark.ctx.parallelize(list)
    //calling function outside closure 
    val after = rddList.map(someFunc(_))

    def someFunc(a:Int)  = a+1

    after.collect().map(println(_))
}

这是非工作示例:

object NOTworking extends App {
  new testing().doIT
}

//adding extends Serializable wont help
class testing {  
  val list = List(1,3)  
  val rddList = Spark.ctx.parallelize(list)

  def doIT =  {
    //again calling the fucntion someFunc 
    val after = rddList.map(someFunc(_))
    //this will crash (spark lazy)
    after.collect().map(println(_))
  }

  def someFunc(a:Int) = a+1
}

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。