如何解决将数据框转换为数据集会保留额外的列
在 Spark 2.11
中,将 Dataframe 转换为 Dataset 时,spark 会保留在数据集的类中甚至未引用的额外列。
scala> case class F(x: String,y: String)
defined class F
scala> import spark.implicits._
import spark.implicits._
scala> val df = Seq(("1a","2a","3a","4a"),("5a","6a","7a","8a")).toDF("w","x","y","z")
df: org.apache.spark.sql.DataFrame = [w: string,x: string ... 2 more fields]
scala> val ds = df.to
toDF toJSON toJavaRDD toLocalIterator toString
scala> val ds = df.as
as asInstanceOf
scala> val ds = df.as[F]
ds: org.apache.spark.sql.Dataset[F] = [w: string,x: string ... 2 more fields]
scala> ds.show()
+---+---+---+---+
| w| x| y| z|
+---+---+---+---+
| 1a| 2a| 3a| 4a|
| 5a| 6a| 7a| 8a|
+---+---+---+---+
这将它的类型安全方面抛诸脑后。有没有办法防止这种情况发生?
我使用 ds.map(identity)
读到,因为转换应该是惰性的,但在尝试写入数据帧时不会触发。此外,不可能编写测试来覆盖 .map(identity)
并防止删除。
解决方法
您可以通过多种方式解决此行为,但这是 Scala 中最简单、最高效的选项之一,用于定义您自己的 to[T]
方法,该方法仅选择适当的列:
import scala.reflect.runtime.universe.TypeTag
import org.apache.spark.sql._
object SparkExtensions {
implicit class ExtendedDataFrame(df: DataFrame) {
def to[T <: Product: TypeTag]: Dataset[T] = {
import df.sparkSession.implicits._
import org.apache.spark.sql.functions.col
df.select(Encoders.product[T].schema.map(f => col(f.name)): _*).as[T]
}
}
}
您可以通过以下方式使用:
scala> case class F(x: String,y: String)
defined class F
scala> import SparkExtensions._
import SparkExtensions._
scala> val df = Seq(("1a","2a","3a","4a"),("5a","6a","7a","8a")).toDF("w","x","y","z")
df: org.apache.spark.sql.DataFrame = [w: string,x: string ... 2 more fields]
scala> val ds: Dataset[F] = df.to[F]
ds: org.apache.spark.sql.Dataset[F] = [w: string,x: string]
scala> ds.show()
+---+---+
| w| x|
+---+---+
| 1a| 2a|
| 5a| 6a|
+---+---+
如果您不想定义隐式类,您总是可以简单地执行 df.as[F].rdd.toDS
以稍微降低运行时效率的方式实现相同的结果。
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。