如何解决Spark汇总/分组依据,以便根据集合中的col值确定新列的值
我有一些数据将按ID分组。
id,field
0 A
0 B
0 C
1 B
1 B
1 C
2 E
- 我想按ID分组并计算一个简单的新值
is_special
,即group by id,if any(field) is in a special set {A,E}
(只是一组随机字母,没有模式)。
id,is_special
0 True
1 False
2 True
类似this question,但在pyspark中。
- 我想了解如何进行分组而不实际分组,只需创建一个新列即可:
id,field,is_special
0 A,True
0 B,True
0 C,True
1 B,False
1 B,False
1 C,False
2 E,True
我认为可以使用以下某些方法来完成此操作,但是我不知道如何将window
与when
一起使用。
from F import when,col,coalesce
special = ['A','E']
window = Window.partitionBy('product_ari')
df.withColumn("is_special",when(col("field").isin(special),lit(True))
)
解决方法
创建测试集:
a = [
(0,"A"),(0,"B"),"C"),(1,(2,"E"),]
b = ["id","field"]
df = spark.createDataFrame(a,b)
set_ = ("A","E")
这样做的秘密方式。
- 加入
from pyspark.sql import functions as F
agg_df = (
df.withColumn(
"is_special",F.when(F.expr(f"field in {set_}"),True).otherwise(False)
)
.groupBy("id")
.agg(F.max("is_special").alias("is_special"))
)
df.join(agg_df,on="id",how="left").show()
+---+-----+----------+
| id|field|is_special|
+---+-----+----------+
| 0| A| true|
| 0| B| true|
| 0| C| true|
| 1| B| false|
| 1| B| false|
| 1| C| false|
| 2| E| true|
+---+-----+----------+
- 带窗户
from pyspark.sql import Window
df.withColumn(
"is_special",True).otherwise(False)
).withColumn("is_special",F.max("is_special").over(Window.partitionBy("id"))).show()
# OR "one-liner"
df.withColumn(
"is_special",F.max(F.when(F.expr(f"field in {set_}"),True).otherwise(False)).over(
Window.partitionBy("id")
),).show()
+---+-----+----------+
| id|field|is_special|
+---+-----+----------+
| 0| A| true|
| 0| B| true|
| 0| C| true|
| 1| B| false|
| 1| B| false|
| 1| C| false|
| 2| E| true|
+---+-----+----------+
,
对于一些知识上的随身携带,以下工作也是如此:
from pyspark.sql.functions import (
array_intersect,size,array_except,collect_set,lit,array,explode,)
df = sc.parallelize(
[
(0,"G"),"J"),(3,(4,(5,(6,"Z"),]
).toDF(["id","field"])
df2 = df.groupby("id").agg(collect_set("field").alias("X"))
df3a = df2.filter(size(array_intersect(df2["X"],lit(array(lit("E"),lit("A"))))) >= 1)
df3b = df2.filter(size(array_intersect(df2["X"],lit("A"))))) == 0)
df4 = (
df3a.select(df3a.id,explode(df3a.X).alias("field"))
.withColumn("is_special",lit(True))
.union(
df3b.select(df3b.id,explode(df3b.X).alias("field")).withColumn(
"is_special",lit(False)
)
)
)
df4.show()
返回:
+---+-----+----------+
| id|field|is_special|
+---+-----+----------+
| 0| C| true|
| 0| B| true|
| 0| A| true|
| 5| E| true|
| 5| A| true|
| 3| A| true|
| 2| J| true|
| 2| E| true|
| 2| G| true|
| 2| A| true|
| 4| E| true|
| 6| Z| false|
| 1| C| false|
| 1| B| false|
+---+-----+----------+
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。