如何解决Kryo编码器v.s. Spark数据集中的RowEncoder
以下示例的目的是了解Spark数据集中两种编码器的区别。
我可以这样做:
val df = Seq((1,"a"),(2,"d")).toDF("id","value")
import org.apache.spark.sql.{Encoder,Encoders,Row}
import org.apache.spark.sql.catalyst.encoders.RowEncoder
import org.apache.spark.sql.types._
val myStructType = StructType(Seq(StructField("id",IntegerType),StructField("value",StringType)))
implicit val myRowEncoder = RowEncoder(myStructType)
val ds = df.map{case row => row}
ds.show
//+---+-----+
//| id|value|
//+---+-----+
//| 1| a|
//| 2| d|
//+---+-----+
我也可以这样做:
val df = Seq((1,Row}
import org.apache.spark.sql.catalyst.encoders.RowEncoder
import org.apache.spark.sql.types._
implicit val myKryoEncoder: Encoder[Row] = Encoders.kryo[Row]
val ds = df.map{case row => row}
ds.show
//+--------------------+
//| value|
//+--------------------+
//|[01 00 6F 72 67 2...|
//|[01 00 6F 72 67 2...|
//+--------------------+
代码的唯一区别是:一种使用 Kryo编码器,另一种使用 RowEncoder 。
问题:
解决方法
Encoders.kryo只需创建一个编码器,即可使用Kryo 序列化T类型的对象
RowEncoder是Scala中具有apply和其他工厂方法的对象。 RowEncoder可以从模式创建ExpressionEncoder [Row]。 在内部,apply为Row类型创建一个BoundReference,并为输入模式返回一个ExpressionEncoder [Row],一个CreateNamedStruct序列化器(使用serializerFor内部方法),一个模式的反序列化器和Row类型
RowEncoder了解架构,并将其用于序列化和反序列化。
Kryo比Java序列化(通常多达10倍)要快得多,而且更紧凑,但不支持所有Serializable类型,并且需要您预先注册要在程序中使用的类,以实现最佳性能。
Kryo可以有效地存储大型数据集和网络密集型应用程序。
有关更多信息,您可以参考以下链接:
https://jaceklaskowski.gitbooks.io/mastering-spark-sql/content/spark-sql-RowEncoder.html https://jaceklaskowski.gitbooks.io/mastering-spark-sql/content/spark-sql-Encoders.html https://medium.com/@knoldus/kryo-serialization-in-spark-55b53667e7ab https://stackoverflow.com/questions/58946987/what-are-the-pros-and-cons-of-java-serialization-vs-kryo-serialization#:~:text=Kryo%20is%20significantly%20faster%20and,in%20advance%20for%20best%20performance。
,根据Spark的文档,SparkSQL不使用Kryo或Java 序列化(通常)。
Kyro适用于RDD,不适用于数据框或数据集。因此,问题是 一点点光束afaik。
Does Kryo help in SparkSQL? 这详细介绍了自定义对象,但是...
更新一些空闲时间后的答案
您的示例并不是我所说的自定义类型。他们是 只是带有基本体的结构。没问题。
Kyro是序列化程序,DS,DF使用编码器来获得列优势。 Spark内部使用Kyro进行改组。
此用户定义的示例
case class Foo(name: String,position: Point)
是我们可以使用DS或DF或通过kyro进行的示例。那是什么 Tungsten和Catalyst的工作重点是“了解 数据的结构”?并因此能够进行优化。您还可以获得 kyro的单个二进制值,我发现了一些有关如何 成功使用它,例如加入。
KYRO示例
import org.apache.spark.sql.{Encoder,Encoders,SQLContext}
import org.apache.spark.{SparkConf,SparkContext}
import spark.implicits._
case class Point(a: Int,b: Int)
case class Foo(name: String,position: Point)
implicit val PointEncoder: Encoder[Point] = Encoders.kryo[Point]
implicit val FooEncoder: Encoder[Foo] = Encoders.kryo[Foo]
val ds = Seq(new Foo("bar",new Point(0,0))).toDS
ds.show()
返回:
+--------------------+
| value|
+--------------------+
|[01 00 D2 02 6C 6...|
+--------------------+
使用案例类Example的DS编码器
import org.apache.spark.sql.{Encoder,position: Point)
val ds = Seq(new Foo("bar",0))).toDS
ds.show()
返回:
+----+--------+
|name|position|
+----+--------+
| bar| [0,0]|
+----+--------+
这使我成为使用Spark,钨,催化剂的一种方式。
现在,当涉及到Any时,更复杂的事情就是这样,但是Any并不是一件好事:
val data = Seq(
("sublime",Map(
"good_song" -> "santeria","bad_song" -> "doesn't exist")
),("prince_royce",Map(
"good_song" -> 4,"bad_song" -> "back it up")
)
)
val schema = List(
("name",StringType,true),("songs",MapType(StringType,true)
)
val rdd= spark.sparkContext.parallelize(data)
rdd.collect
val df = spark.createDataFrame(rdd)
df.show()
df.printSchema()
返回:
Java.lang.UnsupportedOperationException: No Encoder found for Any.
然后这个示例很有趣,它是一个有效的自定义对象用例 Spark No Encoder found for java.io.Serializable in Map[String,java.io.Serializable]。但我会远离这种情况。
结论
-
Kryo vs Encoder vs Java Serialization in Spark?指出,kyro用于RDD,但用于传统。内部可以使用它。不是100%正确,但实际上是正确的。
-
Spark: Dataset Serialization也是一个信息链接。
-
东西已经发展,其精神是不要在DS,DF中使用kyro。
希望这会有所帮助。
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。