如何解决pyspark 结构化流与水印聚合问题
我在使用结构化流聚合时遇到了麻烦。数据来自 Kafka,没有问题,我试图在一些简单的聚合后将它写回镶木地板文件。这在批处理模式下也很好用。问题是当我尝试启动 writeStream 时:
错误:
pyspark.sql.utils.AnalysisException: u'Append output mode not 当流上有流聚合时支持 无水印的DataFrames/DataSets;
但我在聚合上使用水印:
df = spark \
.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers","*****,******") \
.option("subscribe","test") \
.option("startingOffsets","earliest")\
.load()
df1=df.selectExpr("CAST(key AS STRING) key","CAST(value AS STRING) value","timestamp")
df2=df1.withColumn("value",from_json("value",schemaStr))
df3=df2.selectExpr("key","value.JournaleNro JournaleNro","value.RecordId RecordId","inline_outer(value.result)","to_timestamp(value.transcStartDate) transcStartDate","timestamp")
df4=df3.withColumn('statCalcDateTime',current_timestamp()) \
.withWatermark("timestamp","10 minutes") \
.groupBy(
window(df3.timestamp,"10 minutes","5 minutes"),"key","RecordId","transcStartDate").agg(
avg("conf").alias("avg_confianza"),max("conf").alias("max_confianza"),min("conf").alias("min_confianza"),count("*").alias("cnt_palabras"),countdistinct('word').alias('dist_palabras'))
df4.writeStream\
.format("parquet") \
.option("checkpointLocation",path_checkpointLocation) \
.option("path",path_ods_trec_stats) \
.start()
pyspark 2.4.0, 蟒蛇2.7.6, Cloudera CDH 5.13 上的 kafka 4.10
任何想法/建议将不胜感激
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。