微信公众号搜"智元新知"关注
微信扫一扫可直接关注哦!

Glue Studio 作业创建空表/空列

如何解决Glue Studio 作业创建空表/空列

每次我运行 glue Studio 作业时,它都会创建不完整的文件或空文件。基本上我想要做的是在两个 S3 文件夹中读取包含作为不同数据源的镶木地板文件,并在 ID 上加入这些数据源。在工作期间,我删除了一些列并重命名了一些列,因为这些表具有冲突的列名。作为最后一步,我想将表转换为镶木地板文件并将其存储在 s3 存储桶中。每次我运行该作业时,它要么创建空文件,要么创建只有几列填充数据的文件

我已经尝试了几个选项作为数据源:直接来自 S3、使用 athena 数据库(下面的示例)、使用爬虫,甚至在 CSV 中进行。我在 stackoverflow 上看到了一些示例,其中冲突的数据类型会导致错误,所以我将我在 glue 中用作源的文件的所有列都转换为我希望它们所在的数据类型。不幸的是,所有这些都没有解决问题。

我在 AWS glue Studio 中执行此操作,并从脚本选项卡复制以下代码

import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import glueContext
from awsglue.job import Job
from awsglue.dynamicframe import DynamicFrame

## @params: [JOB_NAME]
args = getResolvedOptions(sys.argv,['JOB_NAME'])

sc = SparkContext()
glueContext = glueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'],args)
## @type: DataSource
## @args: [database = "glue_test",table_name = "MysqLdb_b",transformation_ctx = "DataSource0"]
## @return: DataSource0
## @inputs: []
DataSource0 = glueContext.create_dynamic_frame.from_catalog(database = "glue_test",transformation_ctx = "DataSource0")
## @type: ApplyMapping
## @args: [mappings = [("b_id","string","b_id","string"),("ua","ua","string")],transformation_ctx = "Transform1"]
## @return: Transform1
## @inputs: [frame = DataSource0]
Transform1 = ApplyMapping.apply(frame = DataSource0,mappings = [("b_id",transformation_ctx = "Transform1")
## @type: DataSource
## @args: [database = "glue_test",table_name = "MysqLdb_b_r",transformation_ctx = "DataSource1"]
## @return: DataSource1
## @inputs: []
DataSource1 = glueContext.create_dynamic_frame.from_catalog(database = "glue_test",transformation_ctx = "DataSource1")
## @type: ApplyMapping
## @args: [mappings = [("id","id",transformation_ctx = "Transform2"]
## @return: Transform2
## @inputs: [frame = DataSource1]
Transform2 = ApplyMapping.apply(frame = DataSource1,mappings = [("id",transformation_ctx = "Transform2")
## @type: Join
## @args: [columnConditions = ["="],joinType = outer,keys2 = ["id"],keys1 = ["b_id"],transformation_ctx = "Transform0"]
## @return: Transform0
## @inputs: [frame1 = Transform1,frame2 = Transform2]
Transform1DF = Transform1.toDF()
Transform2DF = Transform2.toDF()
Transform0 = DynamicFrame.fromDF(Transform1DF.join(Transform2DF,(Transform1DF['b_id'] == Transform2DF['id']),"outer"),glueContext,"Transform0")
## @type: DataSink
## @args: [connection_type = "s3",format = "parquet",connection_options = {"path": "s3://myfolder/myfolder/myfolder/myfolder/","partitionKeys": []},transformation_ctx = "DataSink0"]
## @return: DataSink0
## @inputs: [frame = Transform0]
DataSink0 = glueContext.write_dynamic_frame.from_options(frame = Transform0,connection_type = "s3",transformation_ctx = "DataSink0")
job.commit()

对于采取哪些步骤有什么建议吗?或者是什么导致了这个问题?

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。