如何解决组中的 Spark Pivot 没有聚合
我有这个数据帧:
id | 姓名 | q1_w | q1_x | q1_y | q1_z | q2_w | q2_x | q2_y | q2_z |
---|---|---|---|---|---|---|---|---|---|
1 | AAAA | val1 | val2 | val3 | val4 | 价值 | valx | 瓦利 | valz |
2 | BBBB | del1 | del2 | del3 | del4 | delw | delx | dely | delz |
3 | CCCC | sol1 | sol2 | sol3 | sol4 | 空 | 空 | 空 | 空 |
尝试转换为这个 DataFrame:
id | 姓名 | w | x | y | z |
---|---|---|---|---|---|
1 | AAAA | val1 | val2 | val3 | val4 |
1 | AAAA | 价值 | valx | 瓦利 | valz |
2 | BBBB | del1 | del2 | del3 | del4 |
2 | BBBB | delw | delx | dely | delz |
3 | CCCC | sol1 | sol2 | sol3 | sol4 |
什么 DataFrame 转换可以帮助我将 WITHOUT
转换为 RDD?
解决方法
如果源数据帧中的列数保持不变或相同,那么您只需执行两个单独的转换,即选择列并重命名它,然后对两个数据帧进行联合,您就可以获得所需的输出。
//source Data creation
val df = Seq((1,"AAAA","val1","val2","val3","val4","valw","valx","valy","valz"),(2,"BBBB","del1","del2","del3","del4","delw","delx","dely","delz"),(3,"CCCC","sol1","sol2","sol3","sol4",null,null)).toDF("id","name","q1_w","q1_x","q1_y","q1_z","q2_w","q2_x","q2_y","q2_z")
//creating first dataframe with required columns and renaming them
val df1 = df.select("id","q1_z").filter($"q1_w".isNotNull).filter($"q1_x".isNotNull).filter($"q1_y".isNotNull).filter($"q1_z".isNotNull).withColumnRenamed("q1_w","w").withColumnRenamed("q1_x","x").withColumnRenamed("q1_y","y").withColumnRenamed("q1_z","z")
//creating second dataframe with required columns and renaming them
val df2 = df.select("id","q2_z").filter($"q2_w".isNotNull).filter($"q2_x".isNotNull).filter($"q2_y".isNotNull).filter($"q2_z".isNotNull).withColumnRenamed("q2_w","w").withColumnRenamed("q2_x","x").withColumnRenamed("q2_y","y").withColumnRenamed("q2_z","z")
//union first and the second dataframe and you would get the output that is required.
val finaldf = df1.union(df2)
,
您可以使用 here 中的 melt
概念,然后拆分 _
上的列并进行数据透视(请注意,数据透视可能有点贵):
id_vars = ['id','name']
value_vars = [i for i in df.columns if i not in id_vars]
value_name = "Val"
var_name='Var'
_vars_and_vals = F.array(*(
F.struct(F.lit(c).alias(var_name),F.col(c).alias(value_name))
for c in value_vars))
# Add to the DataFrame and explode
df1 = df.withColumn("_vars_and_vals",F.explode(_vars_and_vals))
cols = ['id','name'] + [
F.col("_vars_and_vals")[x].alias(x) for x in [var_name,value_name]]
split_var = F.split("Var","_")
out = (df1.select(*cols).withColumn("NewVar",split_var[1])
.groupby(id_vars+[split_var[0].alias("q")]).pivot("NewVar").agg(F.first("Val")))
out.show()
+---+----+---+----+----+----+----+
| id|name| q| w| x| y| z|
+---+----+---+----+----+----+----+
| 1|AAAA| q1|val1|val2|val3|val4|
| 1|AAAA| q2|valw|valx|valy|valz|
| 2|BBBB| q1|del1|del2|del3|del4|
| 2|BBBB| q2|delw|delx|dely|delz|
| 3|CCCC| q1|sol1|sol2|sol3|sol4|
| 3|CCCC| q2|null|null|null|null|
+---+----+---+----+----+----+----+
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。