微信公众号搜"智元新知"关注
微信扫一扫可直接关注哦!

使用传递的参数调用粘合作业和 lambda 函数的步骤函数

如何解决使用传递的参数调用粘合作业和 lambda 函数的步骤函数

场景: 我想将 S3(源文件位置)和 s3(输出文件位置)作为 我的工作流程中的输入参数。

工作流程: Aws Step Function 调用 -> lambda 函数和 lambda 函数调用 -> 粘合作业, 我想从 step 函数 -> lambda 函数 -> 胶水作业传递参数,胶水作业在哪里 对 S3 输入文件进行一些转换并将其输出写入 S3 输出文件

Below are step function,lambda function and glue job respectively and the input 
json which is passed to step function as input.

1:输入(传递的参数):

{
    "S3InputFileLocation": "s3://bucket_name/sourcefile.csv","S3OutputFileLocation": "s3://bucket_name/FinalOutputfile.csv"
}

2:Step Function/状态机(使用上述输入调用 lambda 参数):

{
   "StartAt":"AWsstepFunctionInitiator","States":{  
      "AWsstepFunctionInitiator": {  
         "Type":"Task","Resource":"arn:aws:lambda:us-east-1:xxxxxx:function:AWSLambdaFunction","InputPath": "$","End": true
      }
   }
}

3:Lambda 函数(即上面调用的 AWSLambdaFunction,下面又调用 AWSglueJob):

import json
import boto3

def lambda_handler(event,context):
    client= boto3.client("glue")
    client.start_job_run(
        JobName='AWSglueJob',Arguments={
        'S3InputFileLocation': event["S3InputFileLocation"],'S3OutputFileLocation': event["S3OutputFileLocation"]})
    return{
        'statusCode':200,'body':json.dumps('AWS lambda function invoked!')
    }

4:AWS glue 作业脚本:

import sys
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import glueContext
from awsglue.dynamicframe import DynamicFrame
from awsglue.job import Job

print('AWS glue Job started')
args = getResolvedOptions(sys.argv,['AWSglueJob','S3InputFileLocation','S3OutputFileLocation'])
S3InputFileLocation= args['S3InputFileLocation']
S3OutputFileLocation= args['S3OutputFileLocation']

glueContext = glueContext(SparkContext.getorCreate())

dfnew = glueContext.create_dynamic_frame_from_options("s3",{'paths': [S3_InputLocation] },format="csv" )

datasink = glueContext.write_dynamic_frame.from_options(frame = dfnew,connection_type = "s3",connection_options = {"path": S3_OutputLocation},format = "csv",transformation_ctx ="datasink") 

上述 step 函数和相应的工作流执行时没有任何编译或运行时错误,我也看到参数从 Step 函数成功传递到 lambda 函数,但我在胶水作业中的打印语句都没有登录到云监视中,这意味着lambda 函数调用粘合作业时会出现一些问题。请帮我弄清楚我从 lambda 调用胶水的方式是否有问题?

解决方法

嘿嘿

也许它已经解决了,但这两个链接有帮助:

  1. https://docs.aws.amazon.com/glue/latest/dg/aws-glue-api-crawler-pyspark-extensions-get-resolved-options.html
  2. https://docs.aws.amazon.com/glue/latest/dg/aws-glue-programming-etl-glue-arguments.html

并准确回答您的问题:在此处的参数中添加“--”:

    import json
    import boto3

    def lambda_handler(event,context):
        client= boto3.client("glue")
        client.start_job_run(
             JobName='AWSGlueJob',Arguments={
                '--S3InputFileLocation': event["S3InputFileLocation"],'--S3OutputFileLocation': event["S3OutputFileLocation"]})
        return{
         'statusCode':200,'body':json.dumps('AWS lambda function invoked!')
        }

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。