如何解决提高性能 Pyspark RDD
考虑到我目前使用的数据集的维度,我开始使用 PySpark 在 Databricks 中工作。几周后,我仍然难以完全理解幕后发生的事情。
我有一个大约 4000 万行的数据集,我应用这个函数在滚动窗口中动态计算一些聚合:
def freshness(df):
days = lambda x: x*60*60*24
w = Window.partitionBy('csecid','date').orderBy('date')
w1 = Window.partitionBy('csecid').orderBy(F.col('date').cast('timestamp').cast('long')).rangeBetween(-days(100),0)
w2 = Window.partitionBy('csecid').orderBy('date')
w3 = Window.partitionBy('csecid','id','date').orderBy('date')
w4 = Window.partitionBy('csecid','id')
w5 = Window.partitionBy('csecid','id').orderBy(F.col('date').desc())
df = df.withColumn('dateid',F.row_number().over(w))
df1 = df.withColumn('flag',F.collect_list('date').over(w1))
df1 = df1.withColumn('id',F.row_number().over(w2)).select('csecid','flag','id')
df1 = df1.withColumn('date',F.explode('flag')).drop('flag')
df1 = df1.withColumn('dateid',F.row_number().over(w3))
df2 = df1.join(df,on=['csecid','date','dateid'],how='left')
df3 = (df2
.withColumn('analyst_fresh',F.floor(F.approx_count_distinct('analystid',0.005).over(w4)/3))
.orderBy('csecid',F.col('perenddate').desc())
.groupBy('csecid','analystid')
.agg(F.last('date',True).alias('date'),F.last('analyst_fresh',True).alias('analyst_fresh'))
.where(F.col('analystid').isNotNull())
.orderBy('csecid','date')
.withColumn('id2',F.row_number().over(w5))
.withColumn('freshness',F.when(F.col('id2')<=F.col('analyst_fresh'),1).otherwise(0))
.drop('analyst_fresh','id2','analystid')
)
df_fill = _get_fill_dates_df(df3,['id','csecid'])
df3 = df3.join(df_fill,'freshness'],how='outer')
df3 = df3.groupBy('csecid','date').agg(F.max('freshness').alias('freshness'))
df3 = df2.join(df3,'date'],how='left').fillna(0,subset=['freshness'])
df3 = df3.withColumn('fresh_revision',(F.abs(F.col('revisions_improved'))+F.col('freshness'))*F.signum('revisions_improved'))
df4 = (df3
.orderBy('csecid',F.col('perenddate').desc())
.groupBy('csecid','analystid')
.agg(F.last('date').alias('date'),F.last('fresh_revision',True).alias('fresh_revision'))
.orderBy('csecid','date')
.groupBy('csecid','id').agg(F.last('date').alias('date'),F.sum('fresh_revision').alias('fresh_revision'),F.sum(F.abs('fresh_revision')).alias('n_revisions'))
.withColumn('revision_index_improved',F.col('fresh_revision') / F.col('n_revisions'))
.groupBy('csecid','date')
.agg(F.first('revision_index_improved').alias('revision_index_improved'))
)
df5 = df.join(df4,how='left').orderBy('csecid','date')
return df5
weight_list = ['leader','truecall']
for c in weight_list:
df = df.withColumn('revisions_improved',(F.abs(F.col('revisions_improved'))+F.col(c))*F.col('revisions'))
df = freshness(df)
这段代码运行了大约 2 个小时,我注意到大部分计算时间都来自使用单个执行程序的作业(共 16 个可用)。
我读到可以通过使用 .repartition()
重新分配数据帧来克服这个问题。我的问题如下:如何找到上述作业引用的代码?我应该在哪里重新分区我的数据框?这是一个正确的解决方案吗?
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。