微信公众号搜"智元新知"关注
微信扫一扫可直接关注哦!

用零填充缺失的销售值,并在PySpark中计算3个月的平均值

如何解决用零填充缺失的销售值,并在PySpark中计算3个月的平均值

我要添加零销售额的缺失值并计算pyspark中3个月的平均值

import numpy as np

df[‘<your_column_name>’] = df[‘<your_column_name>’].fillna(‘TBD’)
possible_values = df[‘<your_column_name>’].value_counts().to_dict()

possible_values.pop(‘TBD’)
total_items = sum(possible_values.keys())
possible_values = [(k,v) for k,v in possible_values.items()]
prob_dist = [i[1]/total_items for i in possible_values]

def fill_missing_values(item):
    if item != ‘TBD’:
        index = np.random.choice(np.arange(len(prob_dist),p=prob_dist)
        return possible_values[index]
    return item

df[‘<your_column_name>’] = df[‘<your_column_name>’].apply(lambda x: fill_missing_values(x))

我很惊讶地添加以下内容:凡是销售价值为零而错过日期值的地方。并计算3个月的平均值。

解决方法

您可以使用SparkSQL内置函数transform + sequence创建缺少的月份并设置其sales = 0,使用Window聚合函数计算所需的end_date和最后3个月平均销售。下面,出于说明目的,我将代码分为三个步骤,您可以根据自己的要求将它们合并。

注意:这假设每个不同月份最多有一条记录,并且所有日期值的日期均为day = 1,否则将日期截断为 month 级别,方法是使用{{ 1}}和/或定义重复条目的逻辑。

F.trunc(F.to_date('date','d/M/yyyy'),"month")

第1步::设置WinSpec from pyspark.sql import functions as F,Window df = spark.createDataFrame([ ('A','pharma','1/3/2019',50),('A','1/4/2019',60),'1/5/2019',70),'1/8/2019',80),'ENT','1/9/2019',65),'1/11/2019',40) ],['product','specialty','date','sales']) df = df.withColumn('date',F.to_date('date','d/M/yyyy')) 并使用Window聚合函数lead查找下一个结束日期(w1),将其转换为前几个月以进行设置更新顺序:

w1

第2步::使用w1 = Window.partitionBy('product','specialty').orderBy('date') df1 = df.withColumn('end_date',F.coalesce(F.add_months(F.lead('date').over(w1),-1),'date')) +-------+---------+----------+-----+----------+ |product|specialty| date|sales| end_date| +-------+---------+----------+-----+----------+ | A| ENT|2019-08-01| 50|2019-08-01| | A| ENT|2019-09-01| 65|2019-10-01| | A| ENT|2019-11-01| 40|2019-11-01| | A| pharma|2019-03-01| 50|2019-03-01| | A| pharma|2019-04-01| 60|2019-04-01| | A| pharma|2019-05-01| 70|2019-07-01| | A| pharma|2019-08-01| 80|2019-08-01| +-------+---------+----------+-----+----------+ 计算两个日期之间的月数,并使用transform函数迭代months_between(end_date,date),创建一个date _ {{ 1}}和sales = {sequence(0,#months),使用inline_outer展开结构数组:

add_months(date,i)

步骤3::使用以下WinSpec IF(i=0,sales,0)和聚合函数来计算平均值:

df2 = df1.selectExpr("product","specialty","""
       inline_outer(
         transform(
            sequence(0,int(months_between(end_date,date))),i -> (add_months(date,i) as date,IF(i=0,0) as sales)
         )
       )
   """)
+-------+---------+----------+-----+
|product|specialty|      date|sales|
+-------+---------+----------+-----+
|      A|      ENT|2019-08-01|   50|
|      A|      ENT|2019-09-01|   65|
|      A|      ENT|2019-10-01|    0|
|      A|      ENT|2019-11-01|   40|
|      A|   pharma|2019-03-01|   50|
|      A|   pharma|2019-04-01|   60|
|      A|   pharma|2019-05-01|   70|
|      A|   pharma|2019-06-01|    0|
|      A|   pharma|2019-07-01|    0|
|      A|   pharma|2019-08-01|   80|
+-------+---------+----------+-----+
,

对于缺失的值,您可以做

df.fillna(0,subset=['sales'])

对于3个月的平均值,您可以找到一个很好的答案here,只需小心地正确解析时间戳并将窗口开始日期更改为-90

更新

此代码应该可以完成您要找的工作

days = lambda i: i * 86400
w = (Window.orderBy(f.col("timestampGMT").cast('long')).rangeBetween(-days(90),0))

missings_df = sparkSession.createDataFrame([ ('A','1/6/2019',0)],'sales'])

df = (df
      .union(missings_df) # adding missing row
      .withColumn('timestampGMT',f.to_date('date','d/M/yyyy').cast('timestamp')) # cast to timestamp
      .withColumn('rolling_average',f.avg("sales").over(w)) # rolling average on 90 days
     )

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。