微信公众号搜"智元新知"关注
微信扫一扫可直接关注哦!

EMR 生成的文件的 Spark Kryo 反序列化在本地失败

如何解决EMR 生成的文件的 Spark Kryo 反序列化在本地失败

在将 EMR 版本升级到 6.2.0(我们之前使用的是 5.0 beta - ish)和 Spark 3.0.1 后,我们注意到我们无法在本地读取从 EMR 集群写入的 Kryo 文件(这在以前显然是可能的)。尝试读取这样的文件时,抛出的异常如下:

com.esotericsoftware.kryo.KryoException: java.lang.classCastException: scala.Tuple3 cannot be cast to scala.Tuple2

我们使用 spark 3.0.1 和 Kryo 4.0.2(捆绑)并使用 Kryo::readClassAndobject 读取 Kryo 文件,使用 SparkContext::sequenceFile 在 RDD 读取上进行操作。

解决方法

TL;博士: AWS EMR 6.2.0(也可能更早)导致本地反序列化从 EMR 集群写入的 Kryo 文件失败(由于集群运行 AWS Spark fork)。修复代码附在@帖子末尾。


最近,Amazon EMR 集群运行自己的 Apache Spark 分支(即,对于 EMR 6.2.0 集群,Spark 版本为 3.0.1.amzn-0),其中包含 Kryo 作为默认序列化框架,我们使用我们自己。自从升级到 6.2.0 后,我们注意到我们无法在本地读取从 EMR 6.2.0 集群写入的 Kryo 文件,它们会失败并显示如下消息:

com.esotericsoftware.kryo.KryoException: java.lang.ClassCastException: scala.Tuple3 cannot be cast to scala.Tuple2

我们试图读取的 RDD 确实是 Tuple2 类型的 RDD,但显然在反序列化时,Kryo 认为出于某种原因它被编码为 Tuple3 的 RDD。

现在,在内部,Kryo 持有 ID 类的映射,该映射是在运行时构建的,预计在读取和写入 JVM 之间保持一致(用于确定要反序列化到哪个类)。该注册表建立在 Kryo 实例的实例化之上(我们使用 org.apache.spark.serializer.KryoSerializer::newKryo)。经过检查,我们注意到在执行序列化的 EMR 集群和我们的本地机器之间,Tuple2 的 ID 确实不同,这种差异归因于 EMR 设置中存在的单个类,而不是本地存在的类 - 这个类是 org.apache .spark.scheduler.HighlyCompressedMapStatus$CompressedSizes 在任何公开可用的 Spark 代码中都不存在,因此我们将其归因于 Amazon spark fork。这实际上意味着我们无法在本地读取几乎所有由 EMR 集群编写的类,因为不可能在本地使用 Spark 的那个分支,并且该类在创建 Kryo 实例时注册在 ID 13(以后可能会明显改变) ),导致几乎所有的类都无法反序列化。

这里丑陋的解决方法是使用 Kryo 实例的 ClassResolver。如果 CompressedSizes 类在注册表中不存在,我们将所有 id x >= 13 的类注册为 x + 1。这真的很难看,但作为本地修复,它可以工作。显然,它也可能因 EMR/Kryo/Spark 的新版本而中断,因此请格外小心(我们仅在本地使用它进行调试,这仍然很多)。

代码: 以前,我们会像这样创建 Kryo 实例:

val kryoSerializer = new KryoSerializer(sc.getConf)
val kryo = kryoSerializer.newKryo()

现在,我们使用这个:

val kryo = adjustRegistrationsForEmrSpark(kryoSerializer.newKryo())

哪里

private def adjustRegistrationsForEmrSpark(kryo: Kryo): Kryo = {
    val existingRegistrations = getRegistrations(kryo)
    val emrSpecificClassExists = existingRegistrations.exists(_.getType.getName.contains("CompressedSizes"))
    if (emrSpecificClassExists) {
        println(s"detected emr-specific class when creating kryo,not making any adjustments")
        kryo
    } else {
        println(s"emr-specific class missing from registrations,adjusting existing classes by an offset of 1 to compensate")
        val classResolver = kryo.getClassResolver
        existingRegistrations.filter(_.getId >= 13).foreach { registration =>
            val toRegister = new Registration(registration.getType,registration.getSerializer,registration.getId + 1)
            classResolver.register(toRegister)
        }
        kryo
    }
}

private def getRegistrations(kryo: Kryo): List[Registration] = {
    var classIndex = 0
    var reg: Registration = null
    var result: List[Registration] = List()
    do {
        reg = kryo.getClassResolver.getRegistration(classIndex)
        if (reg != null) result ++= List(reg)
        classIndex = classIndex + 1
    } while (reg != null)
    result
}

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。

相关推荐


Selenium Web驱动程序和Java。元素在(x,y)点处不可单击。其他元素将获得点击?
Python-如何使用点“。” 访问字典成员?
Java 字符串是不可变的。到底是什么意思?
Java中的“ final”关键字如何工作?(我仍然可以修改对象。)
“loop:”在Java代码中。这是什么,为什么要编译?
java.lang.ClassNotFoundException:sun.jdbc.odbc.JdbcOdbcDriver发生异常。为什么?
这是用Java进行XML解析的最佳库。
Java的PriorityQueue的内置迭代器不会以任何特定顺序遍历数据结构。为什么?
如何在Java中聆听按键时移动图像。
Java“Program to an interface”。这是什么意思?