如何解决如何获取pyspark中列的加权平均值
在这里我需要在spark数据帧中找到指数移动平均值: 表格:
ab = spark.createDataFrame(
[(1,"1/1/2020",41.0,0.5,1,'10.22'),(1,"10/3/2020",24.0,0.3,0.7,2,'' ),"21/5/2020",32.0,0.4,0.6,3,(2,"3/1/2020",51.0,0.22,0.78,'34.78'),"10/5/2020",14.56,0.333,0.66,"30/9/2020",17.0,0.34,'' )],["CID","date","A","B","C","Row","SMA"] )
ab.show()
+---+---------+-----+-----+----+---+-----+
|CID| date| A| B| C| Row| SMA|
+---+---------+-----+-----+----+---+-----+
| 1| 1/1/2020| 41.0| 0.5| 0.5| 1|10.22|
| 1|10/3/2020| 24.0| 0.3| 0.7| 2| |
| 1|21/5/2020| 32.0| 0.4| 0.6| 3| |
| 2| 3/1/2020| 51.0| 0.22|0.78| 1|34.78|
| 2|10/5/2020|14.56|0.333|0.66| 2| |
| 2|30/9/2020| 17.0| 0.66|0.34| 3| |
+---+---------+-----+-----+----+---+-----+
Expected Output :
+---+---------+-----+-----+----+---+-----+----------+
|CID| date| A| B| C|Row| SMA| EMA|
+---+---------+-----+-----+----+---+-----+----------+
| 1| 1/1/2020| 41.0| 0.5| 0.5| 1|10.22| 10.22|
| 1|10/3/2020| 24.0| 0.3| 0.7| 2| | 14.354|
| 1|21/5/2020| 32.0| 0.4| 0.6| 3| | 21.4124|
| 2| 3/1/2020| 51.0| 0.22|0.78| 1|34.78| 34.78|
| 2|10/5/2020|14.56|0.333|0.66| 2| | 28.04674|
| 2|30/9/2020| 17.0| 0.66|0.34| 3| |20.7558916|
+---+---------+-----+-----+----+---+-----+----------+
逻辑: 对于每个客户 如果row == 1,则 SMA为EMA 其他(C * LAG(EMA)+ A * B)作为EMA
解决方法
这里的问题是,前一行的新计算值被用作当前行的输入。这意味着不可能并行处理单个客户的计算。
对于Spark 3.0+,可以使用pandas udf using grouped map
获得所需的结果ab = spark.createDataFrame(
[(1,"1/1/2020",41.0,0.5,1,'10.22'),(1,"10/3/2020",24.0,0.3,0.7,2,'' ),"21/5/2020",32.0,0.4,0.6,3,(2,"3/1/2020",51.0,0.22,0.78,'34.78'),"10/5/2020",14.56,0.333,0.66,"30/9/2020",17.0,0.34,'' )],\
["CID","date","A","B","C","Row","SMA"] ) \
.withColumn("SMA",F.col('SMA').cast(T.DoubleType())) \
.withColumn("date",F.to_date(F.col("date"),"d/M/yyyy"))
import pandas as pd
def calc(df: pd.DataFrame):
# df is a pandas.DataFrame
df = df.sort_values('date').reset_index(drop=True)
df.loc[0,'EMA'] = df.loc[0,'SMA']
for i in range(1,len(df)):
df.loc[i,'EMA'] = df.loc[i,'C'] * df.loc[i-1,'EMA'] + \
df.loc[i,'A'] * df.loc[i,'B']
return df
ab.groupBy("CID").applyInPandas(calc,schema = "CID long,date date,A double,B double,C double,Row long,SMA double,EMA double")\
.show()
输出:
+---+----------+-----+-----+----+---+-----+------------------+
|CID| date| A| B| C|Row| SMA| EMA|
+---+----------+-----+-----+----+---+-----+------------------+
| 1|2020-01-01| 41.0| 0.5| 0.5| 1|10.22| 10.22|
| 1|2020-03-10| 24.0| 0.3| 0.7| 2| null| 14.354|
| 1|2020-05-21| 32.0| 0.4| 0.6| 3| null|21.412399999999998|
| 2|2020-01-03| 51.0| 0.22|0.78| 1|34.78| 34.78|
| 2|2020-05-10|14.56|0.333|0.66| 2| null| 27.80328|
| 2|2020-09-30| 17.0| 0.66|0.34| 3| null| 20.6731152|
+---+----------+-----+-----+----+---+-----+------------------+
想法是为每个组使用Pandas dataframe。该Pandas数据框包含当前分区的所有值,并按日期排序。现在,在遍历Pandas数据框的过程中,我们可以访问上一行的EMA
的值(对于Spark数据框是不可能的)。
有一些警告:
- 一个分区的所有行应适合单个执行程序的内存。这里不可能进行部分聚合
- 在Pandas数据框上迭代discouraged
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。