微信公众号搜"智元新知"关注
微信扫一扫可直接关注哦!

将数据框转换为数据集会保留额外的列

如何解决将数据框转换为数据集会保留额外的列

Spark 2.11 中,将 Dataframe 转换为 Dataset 时,spark 会保留在数据集的类中甚至未引用的额外列。

scala> case class F(x: String,y: String)
defined class F

scala> import spark.implicits._
import spark.implicits._

scala> val df = Seq(("1a","2a","3a","4a"),("5a","6a","7a","8a")).toDF("w","x","y","z")
df: org.apache.spark.sql.DataFrame = [w: string,x: string ... 2 more fields]

scala> val ds = df.to
toDF   toJSON   toJavaRDD   toLocalIterator   toString

scala> val ds = df.as
as   asInstanceOf

scala> val ds = df.as[F]
ds: org.apache.spark.sql.Dataset[F] = [w: string,x: string ... 2 more fields]

scala> ds.show()
+---+---+---+---+
|  w|  x|  y|  z|
+---+---+---+---+
| 1a| 2a| 3a| 4a|
| 5a| 6a| 7a| 8a|
+---+---+---+---+

这将它的类型安全方面抛诸脑后。有没有办法防止这种情况发生?

我使用 ds.map(identity) 读到,因为转换应该是惰性的,但在尝试写入数据帧时不会触发。此外,不可能编写测试来覆盖 .map(identity) 并防止删除

解决方法

您可以通过多种方式解决此行为,但这是 Scala 中最简单、最高效的选项之一,用于定义您自己的 to[T] 方法,该方法仅选择适当的列:

import scala.reflect.runtime.universe.TypeTag
import org.apache.spark.sql._

object SparkExtensions {
  implicit class ExtendedDataFrame(df: DataFrame) {
    def to[T <: Product: TypeTag]: Dataset[T] = {
      import df.sparkSession.implicits._
      import org.apache.spark.sql.functions.col
      df.select(Encoders.product[T].schema.map(f => col(f.name)): _*).as[T]
    }
  }
}

您可以通过以下方式使用:

scala> case class F(x: String,y: String)
defined class F

scala> import SparkExtensions._
import SparkExtensions._

scala> val df = Seq(("1a","2a","3a","4a"),("5a","6a","7a","8a")).toDF("w","x","y","z")
df: org.apache.spark.sql.DataFrame = [w: string,x: string ... 2 more fields]

scala> val ds: Dataset[F] = df.to[F]
ds: org.apache.spark.sql.Dataset[F] = [w: string,x: string]

scala> ds.show()
+---+---+
|  w|  x|
+---+---+
| 1a| 2a|
| 5a| 6a|
+---+---+

如果您不想定义隐式类,您总是可以简单地执行 df.as[F].rdd.toDS 以稍微降低运行时效率的方式实现相同的结果。

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。