如何解决在Pyspark结构化流中,如何在写入Kafka之前丢弃已经生成的输出?
我正在尝试对Kafka源数据进行结构化流传输(Spark 2.4.0),在该源数据上我将读取最新数据并在10分钟的窗口内执行汇总。写入数据时,我正在使用“更新”模式。
例如,数据模式如下:
tx_id,cust_id,product,timestamp
我的目标是找到最近10分钟内购买了3种以上产品的客户。假设 prod 是从kafka读取的数据框,然后 windowed_df 定义为:
windowed_df_1 = prod.groupBy(window("timestamp","10 minutes"),cust_id).count()
windowed_df = windowed_df_1.filter(col("count")>=3)
然后,我将其与配置单元表“ customer_master”中的主数据框合并,以获取cust_name
:
final_df = windowed_df.join(customer_master,"cust_id")
最后,将此数据帧写入Kafka接收器(或为简单起见,控制台)
query = final_df.writeStream.outputMode("update").format("console").option("truncate",False).trigger(processingTime='2 minutes').start()
query.awaitTermination()
现在,当此代码每2分钟运行一次时,在随后的运行中,我想舍弃所有已经属于我的输出的客户。即使他们再次购买任何产品,我也不希望他们出现在我的输出中。
我可以将流输出临时写在(可能是配置单元表)的某个地方,并为每次执行做一个“ anti-join ”吗? 这样,我还可以在配置单元表中保留历史记录。
我还在某个地方读到了可以将输出写入内存接收器的地方,然后使用df.write
将其保存在HDFS / Hive中。 但是如果我们终止工作并重新运行该怎么办?在这种情况下,内存表将丢失。
由于我是结构化流媒体的新手,请提供帮助。
**
- 更新:-
** 我还尝试了下面的代码在Hive表和控制台(或Kafka接收器)中写入输出:
def write_to_hive(df,epoch_id):
df.persist()
df.write.format("hive").mode("append").saveAsTable("hive_tab_name")
pass
final_df.writeStream.outputMode("update").format("console").option("truncate",False).start()
final_df.writeStream.outputMode("update").foreachBatch(write_to_hive).start()
但这仅执行第一个动作,即写入控制台。 如果我先写“ foreachBatch”,它将保存到Hive表中,但不会打印到控制台。
我想写两个不同的接收器。请帮忙。
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。