如何解决pyspark 中的 For-Loops 导致数据帧大小增加和作业失败
我的 pyspark 代码中有一个 for 循环。当我在大约 5 个循环上测试代码时,它工作正常。但是当我在我的核心数据集上运行它导致 160 个循环时,我的 pyspark 作业(在 EMR 集群上提交)失败。它在失败之前先尝试第二次。
以下是作业在 Spark History Server 中运行的屏幕截图:
初始作业 Attempt ID 1
在下午 4:13 运行,4 小时后进行了第二次尝试 Attempt ID 2
,但失败了。当我打开作业时,我没有看到任何失败的任务或阶段。
我猜这是因为 for 循环的大小不断增加。
这是我的伪代码:
#Load Dataframe
df=spark.read.parquet("s3://path")
df=df.persist(StorageLevel.MEMORY_AND_disK) # I will be using this df in the for loop
flist=list(df.select('key').distinct().toPandas()['key'])
output=[]
for i in flist:
df2=df.filter(col('key)==i))
Perform operations on df2 by each key that result in a dataframe df3
output.append(df3)
final_output = reduce(DataFrame.unionByName,output)
我认为 output
数据帧的大小会增加,作业最终会失败。
我正在运行 9 个工作节点和 8 个 vCore,每个节点有 50GB 内存。
有没有办法在一定次数的循环后将 output
数据帧写入检查点,清除内存,然后从它在 Spark 中停止的地方继续循环?
编辑: 我的预期输出是这样的:
key mean prediction
3172742 0.0448 1
3172742 0.0419 1
3172742 0.0482 1
3172742 0.0471 1
3672767 0.0622 2
3672767 0.0551 2
3672767 0.0406 1
我可以使用 groupBy 函数,因为我正在执行 kmeans 聚类并且它不允许 groupBy。所以我必须遍历每个键来执行 kmeans 聚类。
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。