如何解决当前步骤失败时,AWS 步骤函数不会将下一步添加到 EMR 集群
我已经从 AWS 步骤函数设置了一个状态机,它将创建一个 EMR 集群,添加几个 emr 步骤,然后终止集群。只要所有步骤都运行完成而没有任何错误,这就可以正常工作。如果一个步骤失败,尽管添加了一个捕获以继续下一步,这不会发生。每当一个步骤失败时,该步骤就会被标记为已捕获(图中的橙色),但下一步会被标记为已取消。
如果有帮助,这是我的阶跃函数定义:
{
"StartAt": "MyEMR-SMFlowContainer-beta","States": {
"MyEMR-SMFlowContainer-beta": {
"Type": "Parallel","End": true,"Branches": [
{
"StartAt": "CreateClusterStep-feature-generation-cluster-beta","States": {
"CreateClusterStep-feature-generation-cluster-beta": {
"Next": "Step-SuccessfulJobOne","Type": "Task","ResultPath": "$.Cluster.1.CreateClusterTask","Resource": "arn:aws:states:::elasticmapreduce:createCluster.sync","Parameters": {
"Instances": {
"Ec2subnetIds": [
"subnet-*******345fd38423"
],"InstanceCount": 2,"KeepJobFlowAliveWhenNoSteps": true,"MasterInstanceType": "m4.xlarge","SlaveInstanceType": "m4.xlarge"
},"JobFlowRole": "MyEMR-emrInstance-beta-EMRInstanceRole","Name": "emr-step-fail-handle-test-cluster","ServiceRole": "MyEMR-emr-beta-EMRRole","Applications": [
{
"Name": "Spark"
},{
"Name": "Hadoop"
}
],"AutoScalingRole": "MyEMR-beta-FeatureG-CreateClusterStepfeature-NJB2UG1J1EWB","Configurations": [
{
"Classification": "spark-env","Configurations": [
{
"Classification": "export","Properties": {
"PYSPARK_PYTHON": "/usr/bin/python3"
}
}
]
}
],"LogUri": "s3://MyEMR-beta-feature-createclusterstepfeature-1jpp1wp3dfn04/emr/logs/","ReleaseLabel": "emr-5.32.0","VisibletoAllUsers": true
}
},"Step-SuccessfulJobOne": {
"Next": "Step-AlwaysFailingJob","Catch": [
{
"ErrorEquals": [
"States.ALL"
],"Next": "Step-AlwaysFailingJob"
}
],"TimeoutSeconds": 7200,"ResultPath": "$.ClusterStep.SuccessfulJobOne.AddSparkTask","Resource": "arn:aws:states:::elasticmapreduce:addStep.sync","Parameters": {
"ClusterId.$": "$.Cluster.1.CreateClusterTask.ClusterId","Step": {
"Name": "SuccessfulJobOne","ActionOnFailure": "CONTINUE","HadoopJarStep": {
"Jar": "command-runner.jar","Args": [
"spark-submit","--deploy-mode","client","--master","yarn","--conf","spark.logConf=true","--class","com.test.sample.core.EMRJobRunner","s3://my-****-bucket/jars/77/my-****-bucketBundleJar-1.0.jar","--JOB_NUMBER","1","--JOB_KEY","SuccessfulJobOne"
]
}
}
}
},"Step-AlwaysFailingJob": {
"Next": "Step-SuccessfulJobTwo","Next": "Step-SuccessfulJobTwo"
}
],"ResultPath": "$.ClusterStep.AlwaysFailingJob.AddSparkTask","Step": {
"Name": "AlwaysFailingJob","2","AlwaysFailingJob"
]
}
}
}
},"Step-SuccessfulJobTwo": {
"Next": "TerminateClusterStep-feature-generation-cluster-beta","Next": "TerminateClusterStep-feature-generation-cluster-beta"
}
],"ResultPath": "$.ClusterStep.SuccessfulJobTwo.AddSparkTask","Step": {
"Name": "DeviceJob","3","SuccessfulJobTwo"
]
}
}
}
},"TerminateClusterStep-feature-generation-cluster-beta": {
"End": true,"ResultPath": null,"Resource": "arn:aws:states:::elasticmapreduce:terminateCluster.sync","Parameters": {
"ClusterId.$": "$.Cluster.1.CreateClusterTask.ClusterId"
}
}
}
}
]
}
},"TimeoutSeconds": 43200
}
有人可以就我如何在步骤中捕获失败并忽略它添加下一步提出建议。 提前致谢。
解决方法
问题是因为我没有在 catch 属性中指定 resultPath。这导致 resultPath 被 catch 块覆盖,因为 resultPath 的默认值是 $。下一步无法获取集群信息,因为该信息已被覆盖并因此被取消。
"Catch": [
{
"ErrorEquals": [
"States.ALL"
],"Next": "Step-SuccessfulJobTwo"
}
],
一旦我更新了 catch 以获得正确的结果路径,它就会按预期工作。
"Catch": [
{
"ErrorEquals": [
"States.ALL"
],"Next": "Step-SuccessfulJobTwo","ResultPath": "$.ClusterStep.SuccessfulJobOne.AddSparkTask.Error",}
],
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。