如何解决如何滞后非连续日期
我需要计算col(current_month) / previous(month) partitioned by id
的问题是数据不连续,所以我无法执行lag(price) partition by month
,因为2018-04-01
的上一行是2018-02-01
。 / p>
我希望在呼叫时我希望使用更优雅的解决方案,例如example_1
example_2
,而不是下面的lag('price').over(partitionBy('id').rangeBetween('1 month',0)
使用join或<- pseudo code
使用嵌套
这可能吗?还是我没有想到的第三种选择?
Input:
+----------+-----+--------+-------+
| month|price|quantity| id|
+----------+-----+--------+-------+
|2018-01-01| 3.96| 53.0|abc##10|
|2018-02-01| 3.96| 49.0|abc##10|
|2018-04-01| 3.81| 150.0|abc##10|
|2018-05-01| 3.81| 14.0|abc##10|
|2018-06-01| 3.73| 13.0|abc##10|
|2018-08-01| 2.97| 27.0|abc##10|
|2018-09-01| 2.97| 22.0|abc##10|
|2018-10-01| 2.97| 10.0|abc##10|
|2018-11-01| 2.97| 35.0|abc##10|
|2018-12-01| 2.97| 99.0|abc##10|
+----------+-----+--------+-------+
output:
# I need the previous month's column to calculate col(current_month) / previous(month) partitioned by id
+----------+-----+--------+-------+----------------------+
| month|price|quantity| id| previous_months_price|
+----------+-----+--------+-------+----------------------+
|2018-01-01| 3.96| 53.0|abc##10| null|
|2018-02-01| 3.96| 49.0|abc##10| 53.0|
|2018-04-01| 3.81| 150.0|abc##10| null|
|2018-05-01| 3.81| 14.0|abc##10| 150.0|
|2018-06-01| 3.73| 13.0|abc##10| 14.0|
|2018-08-01| 2.97| 27.0|abc##10| null|
|2018-09-01| 2.97| 22.0|abc##10| 27.0|
|2018-10-01| 2.97| 10.0|abc##10| 22.0|
|2018-11-01| 2.97| 35.0|abc##10| 10.0|
|2018-12-01| 2.97| 99.0|abc##10| 35.0|
+----------+-----+--------+-------+----------------------+
example_1
lj = df_t.select(
'id',F.add_months('month',1).alias('month'),F.col('price').alias('previous_months_price'),)
df_t.join(lj,['id','month'],how='left')
example_2
nxt_dt = F.add_months('month',1)
df_t.withColumn(
'previous_month_price',F.when(
nxt_dt == F.expr('lag(month) over (partition by id order by id,month)'),F.expr('lag(quantity) over (partition by id order by id,)
)
解决方法
使用窗口功能时,还请包括上一行的月份。计算结果列时,如果当前月与上个月的差不等于1,则将其设置为null。
MPMusicPlayerController
打印
df = spark.read.option("header",True).csv(...) \
.withColumn("month",F.to_date("month"))
w = Window.partitionBy("id").orderBy("month")
df.withColumn("prev_qty",F.lag("quantity").over(w)) \
.withColumn("prev_month",F.lag("month").over(w)) \
.withColumn("previous_months_qty",F.expr("case when last_day(add_months(prev_month,1)) = last_day(month) then prev_qty else null end")) \
.drop("prev_qty","prev_month") \
.show()
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。