如何解决PySpark 函数基于多列数据框创建自定义输出
A | B | C | D | E | F | G |
---|---|---|---|---|---|---|
145 | 589 | 1 | 1 | 12 | 25 | |
145 | 589 | 1 | 2 | 1ad34 | ||
145 | 589 | 1 | 3 | 257 | 18 | 55 |
145 | 589 | 2 | 1 | 12 | 25 | |
145 | 589 | 2 | 2 | 22 | 45 | |
145 | 589 | 2 | 3 | |||
145 | 589 | 3 | 1 | 32 | 55 | |
145 | 589 | 3 | 2 |
表格概览:
- A 和 B 列的组合将索引 C 列。对于每个索引的 C 列,我们将有 D 列。 A|B|C|D 的串联标识唯一记录。
- 对于以下完整的数据帧,检查是否在数据帧记录遍历的任何点设置了 E 列。如果是,则返回第一个数值(例如,结果应为 257,应忽略 1ad34)这将是优先级为 1 的操作。
- 如果从不设置 E 列,则返回 F 和 G 的串联以用于最后一行组合。如果永远不会在 E 列上设置 257,则根据 145|589|3|1 返回 3255。
测试用例 1:优先级列 E 包含的值很少。第一个数字是 257。所以对于 145|589,我们的输出应该是 257。
测试用例 2:优先级列 E 完全为空,然后选取 F 和 G 列的最后一个串联值,结果应为 3255 for 145|589
def get_resulting_id(grouped_A_B_df):
try :
out=''
first_E_val_df=grouped_A_B_df.filter(col("E").cast("int").isNotNull()).first()
if ( first_E_val_df):
return first_E_val_df["E"]
unique_C = [x.C for x in grouped_A_B_df.select('C').distinct().collect()]
for uniq in unique_C :
for row in uniq.rdd.toLocalIterator():
out=str(row['F'])+str(row['G'])
except:
raise Exception("Func Failed")
return out
由于源数据帧有 2000 万条记录,我不想在优先级 2 条件下使用 localiterator,任何可能的方法来加速操作。由 A 列和 B 列组合分区的源数据帧将给出子集数据帧。我希望我的自定义函数应用于该子集数据帧并返回每个子集数据帧的结果。
解决方法
根据您提供的示例输入数据,不确定确切了解您的预期输出是什么。我尝试了你的函数,输出是“257”,所以这里是我的完整 pyspark 代码,它应该提供相同的输出:
<div *ngFor="let item of testObject | keyvalue" [hidden]="item.value.length == 0">
Key: <b>{{item.key}}</b>
<div *ngFor="let item1 of item.value">
Value: <b>{{item1}}</b>
</div>
<br>
</div>
如果您需要 Pandas df 作为输出,您可以将 from pyspark.sql import functions as F,Window as W
df.select(
"A","B",F.coalesce(
F.first("E",ignorenulls=True).over(
W.partitionBy("A","B")
.orderBy("C","D")
.rowsBetween(W.unboundedPreceding,W.unboundedFollowing)
),F.last(F.concat(F.col("F"),F.col("G")),).alias("out"),).distinct().show()
+---+---+---+
| A| B|out|
+---+---+---+
|145|589|257|
+---+---+---+
替换为 .show()
。
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。