假设我有以下DataFrame:
+---+--------+---+----+----+ |grp|null_col|ord|col1|col2| +---+--------+---+----+----+ | 1| null| 3|null| 11| | 2| null| 2| xxx| 22| | 1| null| 1| yyy|null| | 2| null| 7|null| 33| | 1| null| 12|null|null| | 2| null| 19|null| 77| | 1| null| 10| s13|null| | 2| null| 11| a23|null| +---+--------+---+----+----+
这是带有注释的相同样本DF,按grp和ord排序:
scala> df.orderBy("grp","ord").show +---+--------+---+----+----+ |grp|null_col|ord|col1|col2| +---+--------+---+----+----+ | 1| null| 1| yyy|null| | 1| null| 3|null| 11| # grp:1 - last value for `col2` (11) | 1| null| 10| s13|null| # grp:1 - last value for `col1` (s13) | 1| null| 12|null|null| # grp:1 - last values for `null_col`,`ord` | 2| null| 2| xxx| 22| | 2| null| 7|null| 33| | 2| null| 11| a23|null| # grp:2 - last value for `col1` (a23) | 2| null| 19|null| 77| # grp:2 - last values for `null_col`,`ord`,`col2` +---+--------+---+----+----+
我想压缩它.即按列“grp”对其进行分组,对于每个组,按“ord”列对行进行排序,并在每列中取最后一个非空值(如果有的话).
+---+--------+---+----+----+ |grp|null_col|ord|col1|col2| +---+--------+---+----+----+ | 1| null| 12| s13| 11| | 2| null| 19| a23| 77| +---+--------+---+----+----+
我见过以下类似的问题:
> How to select the first row of each group?
> How to find first non-null values in groups? (secondary sorting using dataset api)
但我真正的DataFrame有超过250列,所以我需要一个解决方案,我不必明确指定所有列.
我无法绕过它……
MCVE:如何创建示例DataFrame:
>创建本地文件“/tmp/data.txt”并复制并粘贴DataFrame的上下文(如上所述)
>定义function readSparkOutput()
:
>将“/tmp/data.txt”解析为DataFrame:
val df = readSparkOutput("file:///tmp/data.txt")
更新:我认为它应该类似于以下sql:
SELECT grp,ord,null_col,col1,col2 FROM ( SELECT grp,FirsT(null_col) OVER (PARTITION BY grp ORDER BY ord DESC) as null_col,FirsT(col1) OVER (PARTITION BY grp ORDER BY ord DESC) as col1,FirsT(col2) OVER (PARTITION BY grp ORDER BY ord DESC) as col2,ROW_NUMBER() OVER (PARTITION BY grp ORDER BY ord DESC) as rn FROM table_name) as v WHERE v.rn = 1;
how can we dynamically generate such a Spark query?
我尝试了以下简化方法:
import org.apache.spark.sql.expressions.Window val win = Window .partitionBy("grp") .orderBy($"ord".desc) val cols = df.columns.map(c => first(c,ignoreNulls=true).over(win).as(c))
产生:
scala> cols res23: Array[org.apache.spark.sql.Column] = Array(first(grp,true) OVER (PARTITION BY grp ORDER BY ord DESC NULLS LAST UnspecifiedFrame) AS `grp`,first(null_col,true) OVER (PARTITION BY grp ORDER BY ord DESC NULLS LAST UnspecifiedFrame) AS `null_col`,first(ord,true) OVER (PARTITION BY grp ORDER BY ord DESC NULLS LAST UnspecifiedFrame) AS `ord`,first(col1,true) OVER (PARTITION BY grp ORDER BY ord DESC NULLS LAST UnspecifiedFrame) AS `col1`,first(col2,true) OVER (PARTITION BY grp ORDER BY ord DESC NULLS LAST UnspecifiedFrame) AS `col2`)
但我无法将其传递给df.select:
scala> df.select(cols.head,cols.tail: _*).show <console>:34: error: no `: _*' annotation allowed here (such annotations are only allowed in arguments to *-parameters) df.select(cols.head,cols.tail: _*).show
另一种尝试:
scala> df.select(cols.map(col): _*).show <console>:34: error: type mismatch; found : String => org.apache.spark.sql.Column required: org.apache.spark.sql.Column => ? df.select(cols.map(col): _*).show
解决方法
我会像@LeoC一样采用相同的方法,但我相信没有必要将列名称作为字符串来操作,而且我会选择更像spark-sql的答案.
import org.apache.spark.sql.expressions.Window import org.apache.spark.sql.functions.{col,first,last} val win = Window.partitionBy("grp").orderBy(col("ord")).rowsBetween(0,Window.unboundedFollowing) // In case there is more than one group column val nonAggCols = Seq("grp") // Select columns to aggregate on val cols: Seq[String] = df.columns.diff(nonAggCols).toSeq // Map over selection and apply fct val aggregations: Seq[Column] = cols.map(c => first(col(c),ignoreNulls = true).as(c)) // I'd rather cache the following step as it might get expensive val step1 = cols.foldLeft(df)((acc,c) => acc.withColumn(c,last(col(c),ignoreNulls = true).over(win))).cache // Finally we can aggregate our results as followed val results = step1.groupBy(nonAggCols.head,nonAggCols.tail: _*).agg(aggregations.head,aggregations.tail: _*) results.show // +---+--------+---+----+----+ // |grp|null_col|ord|col1|col2| // +---+--------+---+----+----+ // | 1| null| 12| s13| 11| // | 2| null| 19| a23| 77| // +---+--------+---+----+----+
我希望这有帮助.
编辑:您没有得到相同结果的原因是因为您使用的读者不正确.
它将文件中的null解释为字符串而不是null;即:
scala> df.filter('col1.isNotNull).show // +---+--------+---+----+----+ // |grp|null_col|ord|col1|col2| // +---+--------+---+----+----+ // | 1| null| 3|null| 11| // | 2| null| 2| xxx| 22| // | 1| null| 1| yyy|null| // | 2| null| 7|null| 33| // | 1| null| 12|null|null| // | 2| null| 19|null| 77| // | 1| null| 10| s13|null| // | 2| null| 11| a23|null| // +---+--------+---+----+----+
这是我的readSparkOutput版本:
def readSparkOutput(filePath: String): org.apache.spark.sql.DataFrame = { val step1 = spark.read .option("header","true") .option("inferSchema","true") .option("delimiter","|") .option("parserLib","univocity") .option("ignoreLeadingWhiteSpace","true") .option("ignoreTrailingWhiteSpace","true") .option("comment","+") .csv(filePath) val step2 = step1.select(step1.columns.filterNot(_.startsWith("_c")).map(step1(_)): _*) val columns = step2.columns columns.foldLeft(step2)((acc,when(col(c) =!= "null",col(c)))) }
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。