如何解决用零填充缺失的销售值,并在PySpark中计算3个月的平均值
我要添加零销售额的缺失值并计算pyspark中3个月的平均值
import numpy as np
df[‘<your_column_name>’] = df[‘<your_column_name>’].fillna(‘TBD’)
possible_values = df[‘<your_column_name>’].value_counts().to_dict()
possible_values.pop(‘TBD’)
total_items = sum(possible_values.keys())
possible_values = [(k,v) for k,v in possible_values.items()]
prob_dist = [i[1]/total_items for i in possible_values]
def fill_missing_values(item):
if item != ‘TBD’:
index = np.random.choice(np.arange(len(prob_dist),p=prob_dist)
return possible_values[index]
return item
df[‘<your_column_name>’] = df[‘<your_column_name>’].apply(lambda x: fill_missing_values(x))
我很惊讶地添加以下内容:凡是销售价值为零而错过日期值的地方。并计算3个月的平均值。
解决方法
您可以使用SparkSQL内置函数transform + sequence创建缺少的月份并设置其sales = 0,使用Window聚合函数计算所需的end_date
和最后3个月平均销售。下面,出于说明目的,我将代码分为三个步骤,您可以根据自己的要求将它们合并。
注意:这假设每个不同月份最多有一条记录,并且所有日期值的日期均为day = 1,否则将日期截断为 month 级别,方法是使用{{ 1}}和/或定义重复条目的逻辑。
F.trunc(F.to_date('date','d/M/yyyy'),"month")
第1步::设置WinSpec from pyspark.sql import functions as F,Window
df = spark.createDataFrame([
('A','pharma','1/3/2019',50),('A','1/4/2019',60),'1/5/2019',70),'1/8/2019',80),'ENT','1/9/2019',65),'1/11/2019',40)
],['product','specialty','date','sales'])
df = df.withColumn('date',F.to_date('date','d/M/yyyy'))
并使用Window聚合函数lead查找下一个结束日期(w1),将其转换为前几个月以进行设置更新顺序:
w1
第2步::使用w1 = Window.partitionBy('product','specialty').orderBy('date')
df1 = df.withColumn('end_date',F.coalesce(F.add_months(F.lead('date').over(w1),-1),'date'))
+-------+---------+----------+-----+----------+
|product|specialty| date|sales| end_date|
+-------+---------+----------+-----+----------+
| A| ENT|2019-08-01| 50|2019-08-01|
| A| ENT|2019-09-01| 65|2019-10-01|
| A| ENT|2019-11-01| 40|2019-11-01|
| A| pharma|2019-03-01| 50|2019-03-01|
| A| pharma|2019-04-01| 60|2019-04-01|
| A| pharma|2019-05-01| 70|2019-07-01|
| A| pharma|2019-08-01| 80|2019-08-01|
+-------+---------+----------+-----+----------+
计算两个日期之间的月数,并使用transform函数迭代months_between(end_date,date)
,创建一个date _ {{ 1}}和sales = {sequence(0,#months)
,使用inline_outer展开结构数组:
add_months(date,i)
步骤3::使用以下WinSpec IF(i=0,sales,0)
和聚合函数来计算平均值:
df2 = df1.selectExpr("product","specialty","""
inline_outer(
transform(
sequence(0,int(months_between(end_date,date))),i -> (add_months(date,i) as date,IF(i=0,0) as sales)
)
)
""")
+-------+---------+----------+-----+
|product|specialty| date|sales|
+-------+---------+----------+-----+
| A| ENT|2019-08-01| 50|
| A| ENT|2019-09-01| 65|
| A| ENT|2019-10-01| 0|
| A| ENT|2019-11-01| 40|
| A| pharma|2019-03-01| 50|
| A| pharma|2019-04-01| 60|
| A| pharma|2019-05-01| 70|
| A| pharma|2019-06-01| 0|
| A| pharma|2019-07-01| 0|
| A| pharma|2019-08-01| 80|
+-------+---------+----------+-----+
,
对于缺失的值,您可以做
df.fillna(0,subset=['sales'])
对于3个月的平均值,您可以找到一个很好的答案here,只需小心地正确解析时间戳并将窗口开始日期更改为-90
更新
此代码应该可以完成您要找的工作
days = lambda i: i * 86400
w = (Window.orderBy(f.col("timestampGMT").cast('long')).rangeBetween(-days(90),0))
missings_df = sparkSession.createDataFrame([ ('A','1/6/2019',0)],'sales'])
df = (df
.union(missings_df) # adding missing row
.withColumn('timestampGMT',f.to_date('date','d/M/yyyy').cast('timestamp')) # cast to timestamp
.withColumn('rolling_average',f.avg("sales").over(w)) # rolling average on 90 days
)
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。