如何解决计算从上次交易日期开始的天数,这是使用Window函数从熊猫到Pyspark的时间序列实现
pandas_output熊猫编码
# Calculate days since last deal for customer / master customer
df['booked_date_day'] = pd.to_datetime(df['booked_date_day'])
df['customer_whole_days_from_last_deal'] = df[['customer_nbr','booked_date_day']].sort_values(['customer_nbr','booked_date_day']).drop_duplicates().groupby('customer_nbr')['booked_date_day'].diff()
df['customer_whole_days_from_last_deal'] = df.sort_values(['customer_nbr','booked_date_day']).groupby('customer_nbr')['customer_whole_days_from_last_deal'].fillna(method='ffill')
df['customer_whole_days_from_last_deal'] = df['customer_whole_days_from_last_deal'].dt.days
df['master_customer_whole_days_from_last_deal'] = df[['master_cust','booked_date_day']].sort_values(['master_cust','booked_date_day']).drop_duplicates().groupby('master_cust')['booked_date_day'].diff()
df['master_customer_whole_days_from_last_deal'] = df.sort_values(['master_cust','booked_date_day']).groupby('master_cust')['master_customer_whole_days_from_last_deal'].fillna(method='ffill')
df['master_customer_whole_days_from_last_deal'] = df['master_customer_whole_days_from_last_deal'].dt.days
我在PYSPARK中开发的代码:::上面的数据框熊猫是df,下面的数据框熊猫是订单
window1 = Window.partitionBy(orders.customer_nbr).orderBy(orders.booked_date_day)
orders = orders.withColumn("customer_whole_days_from_last_deal",F.datediff(orders.booked_date_day,F.lag(orders.booked_date_day,1).over(window1)))
window2 = Window.partitionBy(orders.master_cust).orderBy(orders.booked_date_day)
orders = orders.withColumn("master_whole_days_from_last_deal",1).over(window2)))
window_ff1= Window.partitionBy(orders.customer_nbr).orderBy(orders.booked_date_day).rowsBetween(-sys.maxsize,0)
filled_column1 = last(orders['customer_whole_days_from_last_deal'],ignorenulls=True).over(window_ff1)
orders = orders.withColumn('customer_whole_days_from_last_deal',filled_column1)
window_ff2= Window.partitionBy(orders.master_cust).orderBy(orders.booked_date_day).rowsBetween(-sys.maxsize,0)
filled_column2 = last(orders['master_whole_days_from_last_deal'],ignorenulls=True).over(window_ff2)
orders = orders.withColumn('master_whole_days_from_last_deal',filled_column2)
我正在尝试计算自上次为客户/主客户达成交易以来的天数,请让我知道我在pyspark中做错了什么,因为我在熊猫中排成一行,而在pyspark中却得到了更多行。
解决方法
df1 = orders.select('customer_nbr','booked_date_day').sort("customer_nbr","booked_date_day").dropDuplicates()
window1 = Window.partitionBy(df1.customer_nbr).orderBy(df1.booked_date_day)
df1 = df1.withColumn("customer_whole_days_from_last_deal",F.datediff(df1.booked_date_day,F.lag(df1.booked_date_day,1).over(window1)))
window_ff1= Window.partitionBy(df1.customer_nbr).orderBy(df1.booked_date_day).rowsBetween(-sys.maxsize,0)
filled_column1 = last(df1['customer_whole_days_from_last_deal'],ignorenulls=True).over(window_ff1)
df1 = df1.withColumn('customer_whole_days_from_last_deal',filled_column1)
df2 = orders.select('master_cust','booked_date_day').sort("master_cust","booked_date_day").dropDuplicates()
window2 = Window.partitionBy(df2.master_cust).orderBy(df2.booked_date_day)
df2 = df2.withColumn("master_whole_days_from_last_deal",F.datediff(df2.booked_date_day,F.lag(df2.booked_date_day,1).over(window2)))
window_ff2= Window.partitionBy(df2.master_cust).orderBy(df2.booked_date_day).rowsBetween(-sys.maxsize,0)
filled_column2 = last(df2['master_whole_days_from_last_deal'],ignorenulls=True).over(window_ff2)
df2 = df2.withColumn('master_whole_days_from_last_deal',filled_column2)
然后使用广播连接,将数据框'df1'和'df2'连接到'orders'pyspark_final_output,这解决了我的问题。
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。