微信公众号搜"智元新知"关注
微信扫一扫可直接关注哦!

如何使用Scala将DataSet传递给在Apache Spark中接受DataFrame作为参数的函数?

如何解决如何使用Scala将DataSet传递给在Apache Spark中接受DataFrame作为参数的函数?

我在Scala中有一个用于Spark的库,其中包含许多功能一个示例是以下函数,用于合并具有不同列的两个数据框:

def appendDF(df2: DataFrame): DataFrame = {

  val cols1 = df.columns.toSeq
  val cols2 = df2.columns.toSeq

  def expr(sourceCols: Seq[String],targetCols: Seq[String]): Seq[Column] = {
    targetCols.map({
      case x if sourceCols.contains(x) => col(x)
      case y                           => lit(null).as(y)
    })
  }

  // both df's need to pass through `expr` to guarantee the same order,as needed for correct unions.
  df.select(expr(cols1,cols1): _*).union(df2.select(expr(cols2,cols1): _*))

}

我想将此功能(以及更多功能)用于Dataset[CleanRow],而不是DataFrames。 CleanRow一个简单的类,用于定义列的名称和类型。 我有根据的猜测是使用.toDF()方法将数据集转换为数据帧。但是,我想知道是否还有更好的方法

据我了解,Dataset和Dataframe之间应该没有太多区别,因为Dataset只是Dataframe [Row]。另外,我认为从Spark 2.x起,用于DF和DS的API已统一,因此我认为我可以互换使用它们,但这不是事实。

解决方法

如果可以更改签名:

import spark.implicits._
import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.Dataset

def f[T](d: Dataset[T]): Dataset[T] = {d}

// You are able to pass a dataframe:
f(Seq(0,1).toDF()).show
// res1: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [value: int]

// You are also able to pass a dataset:
f(spark.createDataset(Seq(0,1)))
// res2: org.apache.spark.sql.Dataset[Int] = [value: int]

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。