微信公众号搜"智元新知"关注
微信扫一扫可直接关注哦!

pyspark 结构化流与水印聚合问题

如何解决pyspark 结构化流与水印聚合问题

我在使用结构化流聚合时遇到了麻烦。数据来自 Kafka,没有问题,我试图在一些简单的聚合后将它写回镶木地板文件。这在批处理模式下也很好用。问题是当我尝试启动 writeStream 时:

错误

pyspark.sql.utils.AnalysisException: u'Append output mode not 当流上有流聚合时支持 无水印的DataFrames/DataSets;

但我在聚合上使用水印:

df = spark \
  .readStream \
  .format("kafka") \
  .option("kafka.bootstrap.servers","*****,******") \
  .option("subscribe","test") \
  .option("startingOffsets","earliest")\
  .load()

df1=df.selectExpr("CAST(key AS STRING) key","CAST(value AS STRING) value","timestamp")
df2=df1.withColumn("value",from_json("value",schemaStr))
df3=df2.selectExpr("key","value.JournaleNro JournaleNro","value.RecordId RecordId","inline_outer(value.result)","to_timestamp(value.transcStartDate) transcStartDate","timestamp")
df4=df3.withColumn('statCalcDateTime',current_timestamp()) \
    .withWatermark("timestamp","10 minutes") \
    .groupBy(
        window(df3.timestamp,"10 minutes","5 minutes"),"key","RecordId","transcStartDate").agg(
            avg("conf").alias("avg_confianza"),max("conf").alias("max_confianza"),min("conf").alias("min_confianza"),count("*").alias("cnt_palabras"),countdistinct('word').alias('dist_palabras'))

df4.writeStream\
.format("parquet") \
.option("checkpointLocation",path_checkpointLocation) \
.option("path",path_ods_trec_stats) \
.start()


pyspark 2.4.0, 蟒蛇2.7.6, Cloudera CDH 5.13 上的 kafka 4.10

任何想法/建议将不胜感激

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。