微信公众号搜"智元新知"关注
微信扫一扫可直接关注哦!

创建自定义pyspark胶水脚本从redshift读取数据

如何解决创建自定义pyspark胶水脚本从redshift读取数据

我用从 s3 读取的数据编写了这段代码,并在 AWS glue 上写入了 s3。

这是一个自定义的 pyspark 代码,我没有使用生成的脚本。

这是脚本

from pyspark import SparkConf,SparkContext
from pyspark.sql.types import *
from pyspark.sql import *
from pyspark.sql.functions import *

from awsglue.utils import getResolvedOptions
from awsglue.context import glueContext
from awsglue.job import Job
import sys

args = getResolvedOptions(sys.argv,['TempDir','JOB_NAME'])

conf = SparkConf()
    
conf.set("spark.sql.parquet.compression.codec","snappy")
conf.set("spark.sql.parquet.writeLegacyFormat","true")

output_dir_path="s3://mygluecrawler/pysparkglueData/"

sc = SparkContext()

glueContext = glueContext(sc)

spark = glueContext.spark_session

job = Job(glueContext)

job.init(args['JOB_NAME'],args)

input_file = "s3://mygluecrawler/pysparkglueData/store.csv"

#print(" Dropping the malformed data")

sparkDF = spark.read.format("csv").option("header","true").option("mode",'DROPMALFORMED').option("mode","FAILFAST").load(input_file)

sparkDF = sparkDF.withColumn("Competitiondistance",sparkDF.Competitiondistance.cast('float'))
sparkDF = sparkDF.withColumn("CompetitionopenSinceMonth",sparkDF.CompetitionopenSinceMonth.cast('int'))
sparkDF = sparkDF.withColumn("CompetitionopenSinceYear",sparkDF.CompetitionopenSinceYear.cast('int'))
sparkDF = sparkDF.withColumn("Promo2",sparkDF.Promo2.cast('int'))
sparkDF = sparkDF.withColumn("Promo2SinceWeek",sparkDF.Promo2SinceWeek.cast('int'))
sparkDF = sparkDF.withColumn("Promo2SinceYear",sparkDF.Promo2SinceYear.cast('int'))


#sparkDF = sparkDF.fillna(value=0)

#Replaces anything which is null with the value,here its replacing null with -99 in the array of two columns 
#mentioned in the subset

sparkDF = sparkDF.fillna(value=-99,subset=["Promo2SinceWeek","Promo2SinceYear"])


sparkColumns = sparkDF.select('StoreType','Competitiondistance','CompetitionopenSinceYear','Promo2SinceWeek','Promo2')

sparkColumns.write.format('parquet').partitionBy(['StoreType','Promo2']).mode('append').option("path",output_dir_path).save()

上面的脚本运行良好。

现在我正在做同样的事情,但不是从 s3 读取文件并写入 s3 我想用胶水从红移中读取并在红移中写入

这里我正在从 redshift 读取数据并将其写入 redshift 请检查 脚本并告诉我这是正确的方法。我需要帮助。

from pyspark.context import SparkContext,SparkConf
from pyspark.sql import sqlContext
from pyspark.sql import functions as f

from awsglue.dynamicframe import DynamicFrame
from awsglue.utils import getResolvedOptions
from awsglue.transforms import *

from awsglue.context import glueContext
from awsglue.job import Job
import sys

args = getResolvedOptions(sys.argv,'JOB_NAME'])

conf = SparkConf()

sc = SparkContext()

glueContext = glueContext(sc)

spark = glueContext.spark_session

job = Job(glueContext)

job.init(args['JOB_NAME'],args)

jdbcURL = "jdbc:redshift://my-redshift-database.cealcs9iyaaz.us-****-2.redshift.amazonaws.com:5439/dev?user=username&password=password"

#sparkDF = spark.read.format("csv").option("header","FAILFAST").load(input_file)
sparkDf = spark.read.option("url",jdbcURL) \
            .option("dbtable","glue_poc.s3toredshift")\
            .option("tempdir","s3://mygluecrawler/sparkLogs/")\
            .load()

sparkDf.createOrReplaceTempView("people")

newdata = spark.sql("select * from people")


dynamic_df = DynamicFrame.fromDF(newdata,glueContext,"dynamic_df")


mapped_df = ResolveChoice.apply(frame = dynamic_df,choice = "make_cols",transformation_ctx = "mapped_df")

datasink = glueContext.write_dynamic_frame.from_jdbc_conf(frame = mapped_df,catalog_connection = "redshift-new-connection",connection_options = {"dbtable" : "glue_poc.s3toredshift","database":"dev"},redshift_tmp_dir = args["TempDir"],transformation_ctx = "datasink"
    )

job.commit()

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。