微信公众号搜"智元新知"关注
微信扫一扫可直接关注哦!

Apache Spark/Azure Data Lake Storage - 只处理一次文件,将文件标记为已处理

如何解决Apache Spark/Azure Data Lake Storage - 只处理一次文件,将文件标记为已处理

我有一个 Azure Data Lake Storage 容器,它充当 JSON 文件的登陆区域,供 Apache Spark 处理。

那里有数以万计的小(最多几 MB)文件。 Spark 代码会定期读取这些文件并执行一些转换。

我希望文件只被读取一次并且 Spark 脚本是幂等的。 如何确保不会一次又一次地读取文件?我如何以有效的方式做到这一点?

我是这样读取数据的:

spark.read.json("/mnt/input_location/*.json")

我考虑了以下方法

  1. 使用已经处理过的文件名创建一个 Delta 表,并在输入 DataFrame 上运行 EXCEPT 转换
  2. 将处理过的文件移动到不同的位置(或重命名它们)。我宁愿不这样做。如果我需要重新处理数据,我需要再次运行重命名此操作需要很长时间。

希望有更好的方法。请提出建议。

解决方法

您可以使用已启用检查点和 Trigger.Once 的结构化流作业。

该作业的检查点文件将跟踪该作业已使用的 JSON 文件。此外,Trigger.Once 触发器将使此流式作业如同批处理作业一样。

来自 Databrick 的一篇很好的文章解释了“为什么 Streaming 和 RunOnce 比 Batch 更好”。

您的结构化流媒体作业可能如下所示:

val checkpointLocation = "/path/to/checkpoints"
val pathToJsonFiles = "/mnt/input_location/"
val streamDF = spark.readStream.format("json").schema(jsonSchema).load(pathToJsonFiles)

val query = streamDF
  .[...] // apply your processing
  .writeStream
  .format("console") // change sink format accordingly
  .option("checkpointLocation",checkpointLocation)
  .trigger(Trigger.Once)
  .start()

query.awaitTermination()

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。

相关推荐


Selenium Web驱动程序和Java。元素在(x,y)点处不可单击。其他元素将获得点击?
Python-如何使用点“。” 访问字典成员?
Java 字符串是不可变的。到底是什么意思?
Java中的“ final”关键字如何工作?(我仍然可以修改对象。)
“loop:”在Java代码中。这是什么,为什么要编译?
java.lang.ClassNotFoundException:sun.jdbc.odbc.JdbcOdbcDriver发生异常。为什么?
这是用Java进行XML解析的最佳库。
Java的PriorityQueue的内置迭代器不会以任何特定顺序遍历数据结构。为什么?
如何在Java中聆听按键时移动图像。
Java“Program to an interface”。这是什么意思?