微信公众号搜"智元新知"关注
微信扫一扫可直接关注哦!

如何获取pyspark中列的加权平均值

如何解决如何获取pyspark中列的加权平均值

在这里我需要在spark数据帧中找到指数移动平均值: 表格:

ab = spark.createDataFrame(
[(1,"1/1/2020",41.0,0.5,1,'10.22'),(1,"10/3/2020",24.0,0.3,0.7,2,''     ),"21/5/2020",32.0,0.4,0.6,3,(2,"3/1/2020",51.0,0.22,0.78,'34.78'),"10/5/2020",14.56,0.333,0.66,"30/9/2020",17.0,0.34,''     )],["CID","date","A","B","C","Row","SMA"] )
ab.show()

+---+---------+-----+-----+----+---+-----+
|CID|     date|    A|    B|   C| Row|  SMA|
+---+---------+-----+-----+----+---+-----+
|  1| 1/1/2020| 41.0|  0.5| 0.5|  1|10.22|
|  1|10/3/2020| 24.0|  0.3| 0.7|  2|     |
|  1|21/5/2020| 32.0|  0.4| 0.6|  3|     |
|  2| 3/1/2020| 51.0| 0.22|0.78|  1|34.78|
|  2|10/5/2020|14.56|0.333|0.66|  2|     |
|  2|30/9/2020| 17.0| 0.66|0.34|  3|     |
+---+---------+-----+-----+----+---+-----+

Expected Output  :

+---+---------+-----+-----+----+---+-----+----------+
|CID|     date|    A|    B|   C|Row|  SMA|       EMA|
+---+---------+-----+-----+----+---+-----+----------+
|  1| 1/1/2020| 41.0|  0.5| 0.5|  1|10.22|     10.22|
|  1|10/3/2020| 24.0|  0.3| 0.7|  2|     |    14.354|
|  1|21/5/2020| 32.0|  0.4| 0.6|  3|     |   21.4124|
|  2| 3/1/2020| 51.0| 0.22|0.78|  1|34.78|     34.78|
|  2|10/5/2020|14.56|0.333|0.66|  2|     |  28.04674|
|  2|30/9/2020| 17.0| 0.66|0.34|  3|     |20.7558916|
+---+---------+-----+-----+----+---+-----+----------+

逻辑: 对于每个客户 如果row == 1,则 SMA为EMA 其他(C * LAG(EMA)+ A * B)作为EMA

解决方法

这里的问题是,前一行的新计算值被用作当前行的输入。这意味着不可能并行处理单个客户的计算。

对于Spark 3.0+,可以使用pandas udf using grouped map

获得所需的结果
ab = spark.createDataFrame(
    [(1,"1/1/2020",41.0,0.5,1,'10.22'),(1,"10/3/2020",24.0,0.3,0.7,2,''     ),"21/5/2020",32.0,0.4,0.6,3,(2,"3/1/2020",51.0,0.22,0.78,'34.78'),"10/5/2020",14.56,0.333,0.66,"30/9/2020",17.0,0.34,''     )],\
          ["CID","date","A","B","C","Row","SMA"] ) \
    .withColumn("SMA",F.col('SMA').cast(T.DoubleType())) \
    .withColumn("date",F.to_date(F.col("date"),"d/M/yyyy"))

import pandas as pd

def calc(df: pd.DataFrame):
    # df is a pandas.DataFrame
    df = df.sort_values('date').reset_index(drop=True)
    df.loc[0,'EMA'] = df.loc[0,'SMA']
    for i in range(1,len(df)):
        df.loc[i,'EMA'] = df.loc[i,'C'] * df.loc[i-1,'EMA'] + \
        df.loc[i,'A'] * df.loc[i,'B']
    return df

ab.groupBy("CID").applyInPandas(calc,schema = "CID long,date date,A double,B double,C double,Row long,SMA double,EMA double")\
    .show()

输出:

+---+----------+-----+-----+----+---+-----+------------------+
|CID|      date|    A|    B|   C|Row|  SMA|               EMA|
+---+----------+-----+-----+----+---+-----+------------------+
|  1|2020-01-01| 41.0|  0.5| 0.5|  1|10.22|             10.22|
|  1|2020-03-10| 24.0|  0.3| 0.7|  2| null|            14.354|
|  1|2020-05-21| 32.0|  0.4| 0.6|  3| null|21.412399999999998|
|  2|2020-01-03| 51.0| 0.22|0.78|  1|34.78|             34.78|
|  2|2020-05-10|14.56|0.333|0.66|  2| null|          27.80328|
|  2|2020-09-30| 17.0| 0.66|0.34|  3| null|        20.6731152|
+---+----------+-----+-----+----+---+-----+------------------+

想法是为每个组使用Pandas dataframe。该Pandas数据框包含当前分区的所有值,并按日期排序。现在,在遍历Pandas数据框的过程中,我们可以访问上一行的EMA的值(对于Spark数据框是不可能的)。

有一些警告:

  • 一个分区的所有行应适合单个执行程序的内存。这里不可能进行部分聚合
  • 在Pandas数据框上迭代discouraged

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。