微信公众号搜"智元新知"关注
微信扫一扫可直接关注哦!

将Spark结构化流用于聚合批处理ETL作业

如何解决将Spark结构化流用于聚合批处理ETL作业

我想设置一个火花ETL批处理作业。我从Databricks在线阅读到,将​​Trigger.Once与spark流一起使用是一个好主意,因为它可以处理与批处理作业相关的工作。

我要创建的作业进入镶木地板表,并执行该字段的groupbycount。样本数据集是:

+-----------+------------+-----------------------+
|file_sha256|machine_guid|time                   |
+-----------+------------+-----------------------+
|1          |a           |2020-10-20 17:26:51.404|
|1          |a           |2020-10-20 17:26:51.404|
|1          |b           |2020-10-20 17:26:51.404|
|1          |null        |2020-10-20 17:26:51.404|
|1          |c           |2020-10-20 17:26:51.404|
|2          |a           |2020-10-20 17:26:51.404|
|2          |b           |2020-10-20 17:26:51.404|
|null       |a           |2020-10-20 17:26:51.404|
|null       |b           |2020-10-20 17:26:51.404|
|3          |null        |2020-10-20 17:26:51.404|
|null       |null        |2020-10-20 17:26:51.404|
|4          |e           |2020-10-20 17:26:51.404|
+-----------+------------+-----------------------+

root
 |-- file_sha256: string (nullable = true)
 |-- machine_guid: string (nullable = true)
 |-- time: timestamp (nullable = false)

使用常规的非流媒体火花我想要做的是:

df_agg = (df
          .groupby('file_sha256')
          .agg(F.count('file_sha256').alias('file_count')))

哪个给出输出

+-----------+----------+
|file_sha256|file_count|
+-----------+----------+
|       null|         0|
|          1|         5|
|          2|         2|
|          3|         1|
|          4|         1|
+-----------+----------+

我想使用Spark Streaming做到这一点,但是在写出聚合数据时总是会出错。我所拥有的是:

df = (spark
      .readStream
      .schema(df_schema)
      .parquet('test_raw_data'))

数据集test_raw_data与上面显示的测试数据集完全相同

df_agg = (df
          .withWatermark('time','1 seconds')
          .groupby('file_sha256')
          .agg(F.count('file_sha256').alias('file_count')))

(df_agg
 .writeStream
 .trigger(once=True)
 .option('checkpointLocation','checkpoint')
 .outputMode('append')
 .start(path='agg_stream'))

尝试此操作时,我不断出现以下错误

Py4JJavaError: An error occurred while calling o93.start.
: org.apache.spark.sql.AnalysisException: Append output mode not supported when there are streaming aggregations on streaming DataFrames/DataSets without watermark;;
Aggregate [file_sha256#52],[file_sha256#52,count(file_sha256#52) AS file_count#71L]
+- EventTimeWatermark time#54: timestamp,interval 1 seconds
   +- Streamingrelation DataSource(org.apache.spark.sql.SparkSession@7900ef5f,parquet,List(),Some(StructType(StructField(file_sha256,StringType,true),StructField(machine_guid,StructField(time,TimestampType,false))),None,Map(path -> test_raw_data),None),FileSource[test_raw_data],machine_guid#53,time#54]

我不确定为什么由于聚合中指定的缺少水印而导致失败。我在这里看到过其他帖子,但似乎没有任何作用。

我的问题是,如何将这种聚合作为流数据集进行?

我使用pyspark尝试了此操作,但我认为此问题使用哪种语言并不重要。

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。