如何解决使用传递的参数调用粘合作业和 lambda 函数的步骤函数
场景: 我想将 S3(源文件位置)和 s3(输出文件位置)作为 我的工作流程中的输入参数。
工作流程: Aws Step Function 调用 -> lambda 函数和 lambda 函数调用 -> 粘合作业, 我想从 step 函数 -> lambda 函数 -> 胶水作业传递参数,胶水作业在哪里 对 S3 输入文件进行一些转换并将其输出写入 S3 输出文件
Below are step function,lambda function and glue job respectively and the input
json which is passed to step function as input.
1:输入(传递的参数):
{
"S3InputFileLocation": "s3://bucket_name/sourcefile.csv","S3OutputFileLocation": "s3://bucket_name/FinalOutputfile.csv"
}
2:Step Function/状态机(使用上述输入调用 lambda 参数):
{
"StartAt":"AWsstepFunctionInitiator","States":{
"AWsstepFunctionInitiator": {
"Type":"Task","Resource":"arn:aws:lambda:us-east-1:xxxxxx:function:AWSLambdaFunction","InputPath": "$","End": true
}
}
}
3:Lambda 函数(即上面调用的 AWSLambdaFunction,下面又调用 AWSglueJob):
import json
import boto3
def lambda_handler(event,context):
client= boto3.client("glue")
client.start_job_run(
JobName='AWSglueJob',Arguments={
'S3InputFileLocation': event["S3InputFileLocation"],'S3OutputFileLocation': event["S3OutputFileLocation"]})
return{
'statusCode':200,'body':json.dumps('AWS lambda function invoked!')
}
4:AWS glue 作业脚本:
import sys
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import glueContext
from awsglue.dynamicframe import DynamicFrame
from awsglue.job import Job
print('AWS glue Job started')
args = getResolvedOptions(sys.argv,['AWSglueJob','S3InputFileLocation','S3OutputFileLocation'])
S3InputFileLocation= args['S3InputFileLocation']
S3OutputFileLocation= args['S3OutputFileLocation']
glueContext = glueContext(SparkContext.getorCreate())
dfnew = glueContext.create_dynamic_frame_from_options("s3",{'paths': [S3_InputLocation] },format="csv" )
datasink = glueContext.write_dynamic_frame.from_options(frame = dfnew,connection_type = "s3",connection_options = {"path": S3_OutputLocation},format = "csv",transformation_ctx ="datasink")
上述 step 函数和相应的工作流执行时没有任何编译或运行时错误,我也看到参数从 Step 函数成功传递到 lambda 函数,但我在胶水作业中的打印语句都没有登录到云监视中,这意味着lambda 函数调用粘合作业时会出现一些问题。请帮我弄清楚我从 lambda 调用胶水的方式是否有问题?
解决方法
嘿嘿
也许它已经解决了,但这两个链接有帮助:
- https://docs.aws.amazon.com/glue/latest/dg/aws-glue-api-crawler-pyspark-extensions-get-resolved-options.html
- https://docs.aws.amazon.com/glue/latest/dg/aws-glue-programming-etl-glue-arguments.html
并准确回答您的问题:在此处的参数中添加“--”:
import json
import boto3
def lambda_handler(event,context):
client= boto3.client("glue")
client.start_job_run(
JobName='AWSGlueJob',Arguments={
'--S3InputFileLocation': event["S3InputFileLocation"],'--S3OutputFileLocation': event["S3OutputFileLocation"]})
return{
'statusCode':200,'body':json.dumps('AWS lambda function invoked!')
}
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。