如何在pyspark数据帧中计算每日基础时间序列

如何解决如何在pyspark数据帧中计算每日基础时间序列

所以我有一个数据框,我想每天计算一些数量。.假设我们有10列col1,col2,col3,col4 ... coln,每列都取决于值{{1} },col2,col3,col4 ..等,日期根据col1 ..

重置。
id

假设我们执行此数据帧,此df中可能会有更多列... 为了明确起见,我们假设今天的日期是2020-08-01。然后我们进行一些计算,然后在coln得到一些输出,比如说 +--------+----+---- +----+ date |col1|id |col2|. . |coln +--------+----+---- +----+ 2020-08-01| 0| M1 | . . . 3| 2020-08-02| 4| M1 | 10| 2020-08-03| 3| M1 | . . . 9 | 2020-08-04| 2| M1 | . . . 8 | 2020-08-05| 1| M1 | . . . 7 | 2020-08-06| 0| M1 | . . . 0 | 2020-08-01| 0| M2 | . . . 0 | 2020-08-02| 0| M2 | . . . . 1 | 2020-08-03| 0| M2 | . . . . 2 | +---------+----+----+-----------------+ 在2020-08-01,我想在2020-08-02 coln == col1,即col1 == 3并进行2020-08-02进行计算,依此类推...因此df的示例如下所示

coln =3

如果你们能给我一个如何在pyspark中完成此操作的示例,那就太好了。

示例:假设 +--------+----+---- +----+ date |col1|id |col2|. . |coln +--------+----+---- +----+ 2020-08-01| 0| M1 | . . . 3| 2020-08-02| 3| M1 | 10| 2020-08-03|10| M1 | . . . 9 | 2020-08-04| 9| M1 | . . . 8 | 2020-08-05| 8| M1 | . . . 7 | 2020-08-06| 7| M1 | . . . 0 | 2020-08-01| 0| M2 | . . . 1 | 2020-08-02| 1| M2 | . . . . 2 | 2020-08-03| 2| M2 | . . . . 0 | +---------+----+----+-----------------+ ,最初,假设col1均为0。

col3 = col1+ col2

所以让我们集中在df1_schema = StructType([StructField("Date",StringType(),True),\ StructField("col1",IntegerType(),\ StructField("id",\ StructField("col2",\ StructField("col3",\ StructField("coln",True)]) df_data = [('2020-08-01','M1',3,2),('2020-08-02',2,1),\ ('2020-08-03',3),('2020-08-04',\ ('2020-08-01','M2',1,-1,2)] rdd = sc.parallelize(df_data) df1 = sqlContext.createDataFrame(df_data,df1_schema) df1 = df1.withColumn("Date",to_date("Date",'yyyy-MM-dd')) df1.show() +----------+----+---+----+----+----+ | Date|col1| id|col2|col3|coln| +----------+----+---+----+----+----+ |2020-08-01| 0| M1| 3| 3| 2| |2020-08-02| 0| M1| 2| 3| 1| |2020-08-03| 0| M1| 3| 3| 3| |2020-08-04| 0| M1| 3| 3| 1| |2020-08-01| 0| M2| 1| 3| 1| |2020-08-02| 0| M2| -1| 3| 2| +----------+----+---+----+----+----+ 上,而我们想要的是col1 + col2,即3 = col3。在依赖于col3的第n次计算之后。col4 ... col5 ..假设我们得到了一些数字coln =3。在完成计算之后,我们希望在2020-08-01处,coln = 3应该是在col1 因此,在2020-08-01计算完成后,它是动态变化的

enter image description here

所以我想要的df看起来像这样

2020-08-02

编辑2:

+----------+----+---+----+----+----+
|      Date|col1| id|col2|col3|coln|
+----------+----+---+----+----+----+
|2020-08-01|   0| M1|   3|   3|   2|
|2020-08-02|   2| M1|   2|   5|   1|
|2020-08-03|   1| M1|   3|   4|   3|
|2020-08-04|   3| M1|   3|   6|   1|
|2020-08-01|   1| M2|   1|   4|   1|
|2020-08-02|   1| M2|  -1|   0|   2|
+----------+----+---+----+----+----+

所以我们说coln = col4-然后是col2

df1_schema = StructType([StructField("Date",\
                       StructField("col4",'yyyy-MM-dd'))
df1.show()
+----------+----+---+----+----+----+----+
|      Date|col1| id|col2|col3|col4|coln|
+----------+----+---+----+----+----+----+
|2020-08-01|   0| M1|   3|   3|   2|   2|
|2020-08-02|   0| M1|   2|   3|   0|   1|
|2020-08-03|   0| M1|   3|   3|   2|   3|
|2020-08-04|   0| M1|   3|   3|   2|   1|
|2020-08-01|   0| M2|   1|   3|   3|   1|
|2020-08-02|   0| M2|  -1|   3|   1|   2|
+----------+----+---+----+----+----+----+

解决方法

这是您可以使用SparkSQL内置函数aggregate处理的一种类型的问题(要求 Spark 2.4 + ),以下概述了基本概念:

from pyspark.sql.functions import sort_array,collect_list,struct,to_date

cols = ['Date','col1','col2','col3','coln']

df_new = df1.groupby('id') \
    .agg(sort_array(collect_list(struct(*cols))).alias('dta')) \
    .selectExpr("id","""  
      inline( 
        aggregate( 
          /* expr: iterate through the array `dta` from the 2nd to the last items*/
          slice(dta,2,size(dta)-1),/* start: AKA. the zero value which is an array of structs 
           * with a single element dta[0]
           */
          array(dta[0]),/* merge: do the calculations */
          (acc,x) ->   
            concat(acc,array(named_struct( 
              'Date',x.Date,element_at(acc,-1).coln,x.col2,-1).col3 + x.col2,'coln',x.col3 - x.col2 
            )))  
         )    
       )    
   """)

输出:

df_new.show()
+---+----------+----+----+----+----+ 
| id|      Date|col1|col2|col3|coln|
+---+----------+----+----+----+----+
| M1|2020-08-01|   0|   3|   3|   2|
| M1|2020-08-02|   2|   2|   5|   1|
| M1|2020-08-03|   1|   3|   8|   0|
| M1|2020-08-04|   0|   3|  11|   0|
| M2|2020-08-01|   0|   1|   3|   1|
| M2|2020-08-02|   1|  -1|   2|   4|
+---+----------+----+----+----+----+

位置:

  1. 我们将同一行id的行分组并按Date对其进行排序,将结果结构数组命名为dta

  2. 在聚合函数中,我们使用结构数组acc初始化array(dta[0]),然后使用{{从第二项到最后一项遍历数组dta 3}}功能

  3. 在聚合函数的merge部分中,您可以使用x.col1x.coln等引用同一日期的值并使用element_at(acc,-1).col1element_at(acc,-1).coln等以引用上一个日期中的值。

  4. 在合并功能中,我们使用concat(acc,array(...))将新元素附加到结构体acc

    的数组中
  5. 使用slice函数爆炸以上结构数组acc

  6. 此假定的日期是连续的,如果存在缺少的日期,则可以添加一些IF条件。例如,计算下面的col3

    IF(datediff(x.Date,-1).Date) = 1,0) + x.col2
    

顺便说一句。我没有使用示例coln = col4 - col2,而是使用con3 = col3_prev + col2,这是一个更好的示例。

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。

相关推荐


使用本地python环境可以成功执行 import pandas as pd import matplotlib.pyplot as plt # 设置字体 plt.rcParams['font.sans-serif'] = ['SimHei'] # 能正确显示负号 p
错误1:Request method ‘DELETE‘ not supported 错误还原:controller层有一个接口,访问该接口时报错:Request method ‘DELETE‘ not supported 错误原因:没有接收到前端传入的参数,修改为如下 参考 错误2:cannot r
错误1:启动docker镜像时报错:Error response from daemon: driver failed programming external connectivity on endpoint quirky_allen 解决方法:重启docker -> systemctl r
错误1:private field ‘xxx‘ is never assigned 按Altʾnter快捷键,选择第2项 参考:https://blog.csdn.net/shi_hong_fei_hei/article/details/88814070 错误2:启动时报错,不能找到主启动类 #
报错如下,通过源不能下载,最后警告pip需升级版本 Requirement already satisfied: pip in c:\users\ychen\appdata\local\programs\python\python310\lib\site-packages (22.0.4) Coll
错误1:maven打包报错 错误还原:使用maven打包项目时报错如下 [ERROR] Failed to execute goal org.apache.maven.plugins:maven-resources-plugin:3.2.0:resources (default-resources)
错误1:服务调用时报错 服务消费者模块assess通过openFeign调用服务提供者模块hires 如下为服务提供者模块hires的控制层接口 @RestController @RequestMapping("/hires") public class FeignControl
错误1:运行项目后报如下错误 解决方案 报错2:Failed to execute goal org.apache.maven.plugins:maven-compiler-plugin:3.8.1:compile (default-compile) on project sb 解决方案:在pom.
参考 错误原因 过滤器或拦截器在生效时,redisTemplate还没有注入 解决方案:在注入容器时就生效 @Component //项目运行时就注入Spring容器 public class RedisBean { @Resource private RedisTemplate<String
使用vite构建项目报错 C:\Users\ychen\work>npm init @vitejs/app @vitejs/create-app is deprecated, use npm init vite instead C:\Users\ychen\AppData\Local\npm-
参考1 参考2 解决方案 # 点击安装源 协议选择 http:// 路径填写 mirrors.aliyun.com/centos/8.3.2011/BaseOS/x86_64/os URL类型 软件库URL 其他路径 # 版本 7 mirrors.aliyun.com/centos/7/os/x86
报错1 [root@slave1 data_mocker]# kafka-console-consumer.sh --bootstrap-server slave1:9092 --topic topic_db [2023-12-19 18:31:12,770] WARN [Consumer clie
错误1 # 重写数据 hive (edu)> insert overwrite table dwd_trade_cart_add_inc > select data.id, > data.user_id, > data.course_id, > date_format(
错误1 hive (edu)> insert into huanhuan values(1,'haoge'); Query ID = root_20240110071417_fe1517ad-3607-41f4-bdcf-d00b98ac443e Total jobs = 1
报错1:执行到如下就不执行了,没有显示Successfully registered new MBean. [root@slave1 bin]# /usr/local/software/flume-1.9.0/bin/flume-ng agent -n a1 -c /usr/local/softwa
虚拟及没有启动任何服务器查看jps会显示jps,如果没有显示任何东西 [root@slave2 ~]# jps 9647 Jps 解决方案 # 进入/tmp查看 [root@slave1 dfs]# cd /tmp [root@slave1 tmp]# ll 总用量 48 drwxr-xr-x. 2
报错1 hive> show databases; OK Failed with exception java.io.IOException:java.lang.RuntimeException: Error in configuring object Time taken: 0.474 se
报错1 [root@localhost ~]# vim -bash: vim: 未找到命令 安装vim yum -y install vim* # 查看是否安装成功 [root@hadoop01 hadoop]# rpm -qa |grep vim vim-X11-7.4.629-8.el7_9.x
修改hadoop配置 vi /usr/local/software/hadoop-2.9.2/etc/hadoop/yarn-site.xml # 添加如下 <configuration> <property> <name>yarn.nodemanager.res