如何解决使用 Glue 将多个数据文件合并为一个文件 - 作业成功但没有输出文件
TL;DR
- 我正在尝试使用 Glue [Studio] 作业将许多 S3 数据文件合并为较少数量的数据
- 输入数据在 Glue 中编目并可通过 Athena 查询
- Glue 作业以“成功”输出状态运行,但未创建任何输出文件
详情
输入 我的数据是以每分钟一次的周期从爬虫创建的。它将 JSON (gzip) 格式的输出转储到存储桶中。我在 Glue 中对这个存储桶进行了编目,并且可以使用 Athena 对其进行查询,没有错误。这让我更加确信我已经正确设置了目录和数据结构。单独来说,这并不理想,因为它每天创建约 1.4K 文件,这使得对数据的查询(通过 Athena)非常缓慢,因为他们必须扫描太多太小的文件
目标我想定期(可能每周一次,每月一次,我不确定)将每分钟一次的文件合并到更少的文件中,以便查询扫描更大和更少的文件(更快的查询)。
方法 我的计划是创建一个 Glue ETL 作业(使用 Glue Studio)从目录表中读取,并写入一个新的 S3 位置(保持相同的 JSON-gzip 格式,所以我只需将 Glue 表重新指向带有合并文件的新 S3 位置)。我使用 Glue Studio 设置了作业,当我运行它时,它说成功了,但是指定的 S3 位置没有输出(不是空文件,在全部)。
卡住了!我有点不知所措,因为 (1) 它说它成功了,并且 (2) 我什至没有修改脚本(见下文),所以我假设(可能是个坏主意)不是那样的。
日志 我已经尝试浏览 CloudWatch 日志,看看它是否有帮助,但我没有得到太多。我怀疑它可能与此条目有关,但我找不到确认或更改任何内容以“修复”它的方法。 (该路径确实存在,通过我可以在 S3 中看到它这一事实进行了验证,Catalog 可以按照 Athena 查询的验证对其进行搜索,并且它是由 Glue Studio 脚本构建器自动生成的。)对我来说,这听起来像我我在某处选择了一个选项,使它认为我只想要某种“增量”数据扫描。但我没有(有意地),也找不到任何能让我觉得我有的地方。
CloudWatch 日志条目
21/03/13 17:59:39 WARN HadoopDataSource: Skipping Partition {} as no new files detected @ s3://my_bucket/my_folder/my_source_data/ or path does not exist
胶水脚本
import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
## @params: [JOB_NAME]
args = getResolvedOptions(sys.argv,['JOB_NAME'])
sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'],args)
## @type: DataSource
## @args: [database = "my_database",table_name = "my_table",transformation_ctx = "DataSource0"]
## @return: DataSource0
## @inputs: []
DataSource0 = glueContext.create_dynamic_frame.from_catalog(database = "my_database",transformation_ctx = "DataSource0")
## @type: DataSink
## @args: [connection_type = "s3",format = "json",connection_options = {"path": "s3://my_bucket/my_folder/consolidation/","compression": "gzip","partitionKeys": []},transformation_ctx = "DataSink0"]
## @return: DataSink0
## @inputs: [frame = DataSource0]
DataSink0 = glueContext.write_dynamic_frame.from_options(frame = DataSource0,connection_type = "s3",transformation_ctx = "DataSink0")
job.commit()
我首先研究的其他帖子
没有一个“成功”作业不提供输出的相同问题。但是,一个创建了空文件,而另一个创建了太多文件。最有趣的方法是使用 Athena 为您创建新的输出文件(使用外部表);但是,当我调查时,似乎输出格式选项没有 JSON-gzip(或没有 gzip 的 JSON),而只有 CSV 和 Parquet,我不喜欢使用它们。
How to Convert Many CSV files to Parquet using AWS Glue
AWS Glue: ETL job creates many empty output files
AWS Glue Job - Writing into single Parquet file
AWS Glue,output one file with partitions
解决方法
datasource_df = DataSource0.repartition(1)
DataSink0 =glueContext.write_dynamic_frame.from_options(frame = datasource_df,connection_type = "s3",format = "json",connection_options = {"path": "s3://my_bucket/my_folder/consolidation/","compression" : "gzip","partitionKeys": []},transformation_ctx = "DataSink0")
job.commit()
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。