微信公众号搜"智元新知"关注
微信扫一扫可直接关注哦!

在将 DF 创建到 tempView 后运行 Spark Glue 作业时出错

如何解决在将 DF 创建到 tempView 后运行 Spark Glue 作业时出错

 Explanation

当我从动态框架创建 DF 时,它工作正常,我可以写 数据帧回到动态帧但是当我将数据帧转换为
createOrReplaceTempView 然后它向我抛出这个错误。列数 是相同的,从源到目的地没有任何变化

请帮我看看出了什么问题

ERROR

IllegalArgumentException: "requirement Failed: The number of columns doesn't 
match.\nOld column names (2): name,id\nNew column names (0): "

pysparkglueCode

import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.dynamicframe import DynamicFrame
from awsglue.context import glueContext
from awsglue.job import Job
from pyspark.sql.types import *
from pyspark.sql.functions import *

from pyspark import SparkConf,SparkContext
from pyspark.sql import *



 args = getResolvedOptions(sys.argv,['TempDir','JOB_NAME'])

sc = SparkContext()
glueContext = glueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'],args)

datasource0 = glueContext.create_dynamic_frame.from_catalog(database =   
"mygluedatabaSEOregon",table_name = "dev_glue_poc_gluetable",redshift_tmp_dir =  
args["TempDir"],transformation_ctx = "datasource0")

applymapping1 = ApplyMapping.apply(frame = datasource0,mappings = [("name","string","name","string"),("id","int","id","int")],transformation_ctx =   
"applymapping1")

selectfields2 = SelectFields.apply(frame = applymapping1,paths = ["name","id"],transformation_ctx = "selectfields2")

getoutput = selectfields2.toDF() #.select('name','id')

getoutput.createOrReplaceTempView("info")

sqlData = spark.sql("select name,id from info")

outputsql = sqlData.toDF()

getoutputDFY = DynamicFrame.fromDF(outputsql,glueContext,"getoutputDFY")


resolvechoice3 = ResolveChoice.apply(frame = getoutputDFY,choice = 
"MATCH_CATALOG",database = "mygluedatabaSEOregon",table_name = 
"dev_glue_poc_importfromglue",transformation_ctx = "resolvechoice3")



resolvechoice4 = ResolveChoice.apply(frame = resolvechoice3,choice = "make_cols",transformation_ctx = "resolvechoice4")



  datasink5 = glueContext.write_dynamic_frame.from_catalog(frame = resolvechoice4,table_name = "dev_glue_poc_importfromglue",redshift_tmp_dir = args["TempDir"],transformation_ctx = "datasink5")
  job.commit()

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。