如何解决如何在pyspark数据帧中计算每日基础时间序列
所以我有一个数据框,我想每天计算一些数量。.假设我们有10列col1,col2,col3,col4 ... coln,每列都取决于值{{1} },col2,col3,col4 ..等,日期根据col1
..
id
假设我们执行此数据帧,此df中可能会有更多列...
为了明确起见,我们假设今天的日期是2020-08-01。然后我们进行一些计算,然后在coln得到一些输出,比如说 +--------+----+---- +----+
date |col1|id |col2|. . |coln
+--------+----+---- +----+
2020-08-01| 0| M1 | . . . 3|
2020-08-02| 4| M1 | 10|
2020-08-03| 3| M1 | . . . 9 |
2020-08-04| 2| M1 | . . . 8 |
2020-08-05| 1| M1 | . . . 7 |
2020-08-06| 0| M1 | . . . 0 |
2020-08-01| 0| M2 | . . . 0 |
2020-08-02| 0| M2 | . . . . 1 |
2020-08-03| 0| M2 | . . . . 2 |
+---------+----+----+-----------------+
在2020-08-01,我想在2020-08-02 coln == col1,即col1 == 3并进行2020-08-02进行计算,依此类推...因此df的示例如下所示
coln =3
如果你们能给我一个如何在pyspark中完成此操作的示例,那就太好了。
示例:假设 +--------+----+---- +----+
date |col1|id |col2|. . |coln
+--------+----+---- +----+
2020-08-01| 0| M1 | . . . 3|
2020-08-02| 3| M1 | 10|
2020-08-03|10| M1 | . . . 9 |
2020-08-04| 9| M1 | . . . 8 |
2020-08-05| 8| M1 | . . . 7 |
2020-08-06| 7| M1 | . . . 0 |
2020-08-01| 0| M2 | . . . 1 |
2020-08-02| 1| M2 | . . . . 2 |
2020-08-03| 2| M2 | . . . . 0 |
+---------+----+----+-----------------+
,最初,假设col1均为0。
col3 = col1+ col2
所以让我们集中在df1_schema = StructType([StructField("Date",StringType(),True),\
StructField("col1",IntegerType(),\
StructField("id",\
StructField("col2",\
StructField("col3",\
StructField("coln",True)])
df_data = [('2020-08-01','M1',3,2),('2020-08-02',2,1),\
('2020-08-03',3),('2020-08-04',\
('2020-08-01','M2',1,-1,2)]
rdd = sc.parallelize(df_data)
df1 = sqlContext.createDataFrame(df_data,df1_schema)
df1 = df1.withColumn("Date",to_date("Date",'yyyy-MM-dd'))
df1.show()
+----------+----+---+----+----+----+
| Date|col1| id|col2|col3|coln|
+----------+----+---+----+----+----+
|2020-08-01| 0| M1| 3| 3| 2|
|2020-08-02| 0| M1| 2| 3| 1|
|2020-08-03| 0| M1| 3| 3| 3|
|2020-08-04| 0| M1| 3| 3| 1|
|2020-08-01| 0| M2| 1| 3| 1|
|2020-08-02| 0| M2| -1| 3| 2|
+----------+----+---+----+----+----+
上,而我们想要的是col1 + col2,即3 = col3。在依赖于col3的第n次计算之后。col4 ... col5 ..假设我们得到了一些数字coln =3。在完成计算之后,我们希望在2020-08-01
处,coln = 3应该是在col1
因此,在2020-08-01计算完成后,它是动态变化的
所以我想要的df看起来像这样
2020-08-02
编辑2:
+----------+----+---+----+----+----+
| Date|col1| id|col2|col3|coln|
+----------+----+---+----+----+----+
|2020-08-01| 0| M1| 3| 3| 2|
|2020-08-02| 2| M1| 2| 5| 1|
|2020-08-03| 1| M1| 3| 4| 3|
|2020-08-04| 3| M1| 3| 6| 1|
|2020-08-01| 1| M2| 1| 4| 1|
|2020-08-02| 1| M2| -1| 0| 2|
+----------+----+---+----+----+----+
所以我们说coln = col4-然后是col2
df1_schema = StructType([StructField("Date",\
StructField("col4",'yyyy-MM-dd'))
df1.show()
+----------+----+---+----+----+----+----+
| Date|col1| id|col2|col3|col4|coln|
+----------+----+---+----+----+----+----+
|2020-08-01| 0| M1| 3| 3| 2| 2|
|2020-08-02| 0| M1| 2| 3| 0| 1|
|2020-08-03| 0| M1| 3| 3| 2| 3|
|2020-08-04| 0| M1| 3| 3| 2| 1|
|2020-08-01| 0| M2| 1| 3| 3| 1|
|2020-08-02| 0| M2| -1| 3| 1| 2|
+----------+----+---+----+----+----+----+
解决方法
这是您可以使用SparkSQL内置函数aggregate处理的一种类型的问题(要求 Spark 2.4 + ),以下概述了基本概念:
from pyspark.sql.functions import sort_array,collect_list,struct,to_date
cols = ['Date','col1','col2','col3','coln']
df_new = df1.groupby('id') \
.agg(sort_array(collect_list(struct(*cols))).alias('dta')) \
.selectExpr("id","""
inline(
aggregate(
/* expr: iterate through the array `dta` from the 2nd to the last items*/
slice(dta,2,size(dta)-1),/* start: AKA. the zero value which is an array of structs
* with a single element dta[0]
*/
array(dta[0]),/* merge: do the calculations */
(acc,x) ->
concat(acc,array(named_struct(
'Date',x.Date,element_at(acc,-1).coln,x.col2,-1).col3 + x.col2,'coln',x.col3 - x.col2
)))
)
)
""")
输出:
df_new.show()
+---+----------+----+----+----+----+
| id| Date|col1|col2|col3|coln|
+---+----------+----+----+----+----+
| M1|2020-08-01| 0| 3| 3| 2|
| M1|2020-08-02| 2| 2| 5| 1|
| M1|2020-08-03| 1| 3| 8| 0|
| M1|2020-08-04| 0| 3| 11| 0|
| M2|2020-08-01| 0| 1| 3| 1|
| M2|2020-08-02| 1| -1| 2| 4|
+---+----------+----+----+----+----+
位置:
-
我们将同一行
id
的行分组并按Date
对其进行排序,将结果结构数组命名为dta
-
在聚合函数中,我们使用结构数组
acc
初始化array(dta[0])
,然后使用{{从第二项到最后一项遍历数组dta
3}}功能 -
在聚合函数的
merge
部分中,您可以使用x.col1
,x.coln
等引用同一日期的值并使用element_at(acc,-1).col1
,element_at(acc,-1).coln
等以引用上一个日期中的值。 -
在合并功能中,我们使用
的数组中concat(acc,array(...))
将新元素附加到结构体acc
-
使用slice函数爆炸以上结构数组
acc
-
此假定的日期是连续的,如果存在缺少的日期,则可以添加一些IF条件。例如,计算下面的
col3
:IF(datediff(x.Date,-1).Date) = 1,0) + x.col2
顺便说一句。我没有使用示例coln = col4 - col2
,而是使用con3 = col3_prev + col2
,这是一个更好的示例。
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。