如何解决异步 Lambda 函数:返回承诺或发送 responseURL 不会终止 CloudFormation 自定义资源调用
我有一个通过 CloudFormation 模板作为自定义资源调用的 lambda 函数。它创建/删除 AWS Connect 实例。 API 调用工作正常,但我似乎无法终止自定义资源调用,因此最后一个 CF 块仍然是 CREATE_IN_PROGRESS。无论我从异步函数返回什么,它都不会成功终止 CF 执行。
我能够像 https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/walkthrough-custom-resources-lambda-lookup-amiids.html 一样成功使用非异步处理程序,但我需要进行多个 API 调用并等待完成,因此需要异步处理程序。
下面是最简单形式的代码,尽管我已经尝试了几乎所有的方法,包括使用回调和上下文(即exports.handler = async function(event,context,callback) {...}),两者都是对于异步处理程序,这应该是不必要的。我试过使用 cfn-response 直接发送一个似乎被异步处理程序忽略的响应。我试过直接返回带有和不带有等待的承诺,尝试返回包含各种 responseStatus 和 responseData 的变量,但似乎没有任何效果。
Transform: 'AWS::Serverless-2016-10-31'
Parameters:
IdentityManagementType:
Description: The type of identity management for your Amazon Connect users.
Type: String
AllowedValues: ["SAML","CONNECT_MANAGED","EXISTING_DIRECTORY"]
Default: "SAML"
InboundCallsEnabled:
Description: Whether your contact center handles incoming contacts.
Type: String
AllowedValues: [true,false]
Default: true
InstanceAlias:
Description: The name for your instance.
Type: String
MaxLength: 62
OutboundCallsEnabled:
Description: Whether your contact center allows outbound calls.
Type: String
AllowedValues: [true,false]
Default: true
DirectoryId:
Description: Optional. The identifier for the directory,if using this type of Identity Management.
Type: String
ClientToken:
Description: Optional. The idempotency token. Used for concurrent deployments
Type: String
MaxLength: 500
Region:
Description: Region to place the AWS Connect Instance
Type: String
Default: us-east-1
#Handler for optional values
Conditions:
HasClientToken: !Not
- !Equals
- ""
- !Ref ClientToken
HasDirectoryId: !Not
- !Equals
- ""
- !Ref DirectoryId
Resources:
CreateConnectInstance:
Type: AWS::Serverless::Function
Properties:
FunctionName: !Sub "${AWS::StackName}-AWSConnectInstance"
Handler: index.handler
Runtime: nodejs12.x
Description: Invoke a function to create an AWS Connect instance.
MemorySize: 128
Timeout: 30
Role: !GetAtt LambdaExecutionRole.Arn
Layers:
- !Sub "arn:aws:lambda:us-east-1:${AWS::AccountId}:layer:node_sdk:1"
Environment:
Variables:
IdentityManagementType:
Ref: IdentityManagementType
InboundCallsEnabled:
Ref: InboundCallsEnabled
InstanceAlias:
Ref: InstanceAlias
OutboundCallsEnabled:
Ref: OutboundCallsEnabled
Region:
Ref: Region
#Optional Values
ClientToken: !If
- HasClientToken
- !Ref ClientToken
- !Ref "AWS::Novalue"
DirectoryId: !If
- HasClientToken
- !Ref ClientToken
- !Ref "AWS::Novalue"
InlineCode: |
var aws = require("aws-sdk");
exports.handler = async function(event) {
console.log("REQUEST RECEIVED:\n" + JSON.stringify(event));
var connect = new aws.Connect({region: event.ResourceProperties.Region});
var isInboundCallsEnabled = (process.env.InboundCallsEnabled == 'true');
var isOutboundCallsEnabled = (process.env.OutboundCallsEnabled == 'true');
var createInstanceParams = {
InboundCallsEnabled: isInboundCallsEnabled,OutboundCallsEnabled: isOutboundCallsEnabled,IdentityManagementType: process.env.IdentityManagementType,ClientToken: process.env.ClientToken,DirectoryId: process.env.DirectoryId,InstanceAlias: process.env.InstanceAlias
};
// Create AWS Connect instance using specified parameters
if (event.RequestType == "Create") {
return await connect.createInstance(createInstanceParams).promise();
// I can store this in a variable and read the contents fine,but...
// returning the promise does not terminate execution
}
};
InvokeCreateConnectInstance:
Type: Custom::CreateConnectInstance
Properties:
Servicetoken: !GetAtt CreateConnectInstance.Arn
Region: !Ref "AWS::Region"
https://docs.aws.amazon.com/lambda/latest/dg/nodejs-handler.html 处的文档明确指出您应该能够直接从任何异步函数返回 await apiCall.promise(),这正是我想要做的,例如
const s3 = new AWS.S3()
exports.handler = async function(event) {
return s3.listBuckets().promise()
}
为什么我不能从异步函数返回? API 调用再次正常工作,Connect 实例被创建和删除(尽管我为了简洁起见省略了删除代码),但 CF 只是挂了几个小时,直到最终说“自定义资源未能在预期时间内稳定”
为了可读性,这里是内联代码本身:
exports.handler = async function(event) {
console.log("REQUEST RECEIVED:\n" + JSON.stringify(event));
var connect = new aws.Connect({region: event.ResourceProperties.Region});
var isInboundCallsEnabled = (process.env.InboundCallsEnabled == 'true');
var isOutboundCallsEnabled = (process.env.OutboundCallsEnabled == 'true');
var createInstanceParams = {
InboundCallsEnabled: isInboundCallsEnabled,but...
// returning the promise does not terminate CF execution
}
};
更新:我已经完全按照 AMI 查找示例(第一个链接)中所示实现了 sendResponse 方法,并且正在发送完全正确的响应结构,它甚至在数据字段中包含新创建的连接实例 ID:
{
"Status": "SUCCESS","Reason": "See the details in CloudWatch Log Stream: 2020/12/23/[$LATEST]6fef3553870b4fba90479a37b4360cee","PhysicalResourceId": "2020/12/23/[$LATEST]6fef3553870b4fba90479a37b4360cee","StackId": "arn:aws:cloudformation:us-east-1:642608065726:stack/cr12/1105a290-4534-11eb-a6de-0a8534d05dcd","RequestId": "2f7c3d9e-941f-402c-b739-d2d965288cfe","LogicalResourceId": "InvokeCreateConnectInstance","Data": {
"InstanceId": "2ca7aa49-9b20-4feb-8073-5f23d63e4cbc"
}
}
并且仍然不会在 CloudFormation 中关闭自定义资源。我只是不明白为什么当我将上述内容返回到 event.responseURL 时会发生这种情况。这就像指定一个异步处理程序完全破坏了自定义资源处理程序并阻止它关闭。
更新:当我手动将上述响应直接卷曲到 event.responseUrl 时,CF 资源注册成功! WTF...我发送的响应与 lambda 函数发送的响应完全相同,它从 CURL 接受它,但不从我的 lambda 函数接受它。
var aws = require("aws-sdk");
exports.handler = async function(event,callback) {
console.log("REQUEST RECEIVED:\n" + JSON.stringify(event));
var connect = new aws.Connect({region: event.ResourceProperties.Region});
var isInboundCallsEnabled = (process.env.InboundCallsEnabled == 'true');
var isOutboundCallsEnabled = (process.env.OutboundCallsEnabled == 'true');
var createInstanceParams = {
InboundCallsEnabled: isInboundCallsEnabled,InstanceAlias: process.env.InstanceAlias
};
var responseStatus;
var responseData = {};
// Create Connect instance
if (event.RequestType == "Create") {
try {
var createInstanceRequest = await connect.createInstance(createInstanceParams).promise();
responseStatus = "SUCCESS";
responseData = {"InstanceId": createInstanceRequest.Id};
} catch (err) {
responseStatus = "Failed";
responseData = {Error: "CreateInstance Failed"};
console.log(responseData.Error + ":\n",err);
}
sendResponse(event,responseStatus,responseData);
return;
}
// Look up the ID and call deleteInstance.
if (event.RequestType == "Delete") {
var instanceId;
var listInstanceRequest = await connect.listInstances({}).promise();
listInstanceRequest.InstanceSummaryList.forEach(instance => {
if (instance.InstanceAlias == createInstanceParams.InstanceAlias) {
instanceId = instance.Id;
}
});
if (instanceId !== undefined) {
try {
var deleteInstanceRequest = await connect.deleteInstance({"InstanceId": instanceId}).promise();
responseStatus = "SUCCESS";
responseData = {"InstanceId": instanceId};
} catch (err) {
responseStatus = "Failed";
responseData = {Error: "DeleteInstance call Failed"};
console.log(responseData.Error + ":\n",err);
}
} else {
responseStatus = "Failed";
responseData = {Error: "DeleteInstance Failed; no match found"};
console.log(responseData.Error);
}
sendResponse(event,responseData);
return;
}
};
// Send response to the pre-signed S3 URL
function sendResponse(event,responseData) {
var responseBody = JSON.stringify({
Status: responseStatus,Reason: "CloudWatch Log Stream: " + context.logStreamName,PhysicalResourceId: context.logStreamName,StackId: event.StackId,RequestId: event.RequestId,LogicalResourceId: event.LogicalResourceId,Data: responseData
});
console.log("RESPONSE BODY:\n",responseBody);
var https = require("https");
var url = require("url");
var parsedUrl = url.parse(event.ResponseURL);
var options = {
hostname: parsedUrl.hostname,port: 443,path: parsedUrl.path,method: "PUT",headers: {
"content-type": "","content-length": responseBody.length
}
};
console.log("SENDING RESPONSE...\n");
var request = https.request(options,function(response) {
console.log("STATUS: " + response.statusCode);
console.log("HEADERS: " + JSON.stringify(response.headers));
// Tell AWS Lambda that the function execution is done
context.done();
});
request.on("error",function(error) {
console.log("sendResponse Error:" + error);
// Tell AWS Lambda that the function execution is done
context.done();
});
// write data to request body
request.write(responseBody);
request.end();
}
已经在这两天了:(
PS 在日志中,“RESPONSE BODY”按预期显示,就像我上面复制的一样,日志显示“SENDING RESPONSE”,但没有进入“STATUS:”和“HEADERS” : " request.https() 调用的一部分,这让我觉得异步会干扰这个调用...... IDK
解决方法
这个真的很棘手,但最终一切都想通了。我必须通过向其添加承诺,等待该承诺并返回它来使 sendResponse 函数异步。这让我最终可以调用“return await sendResponse(event,context,responseStatus,responseData);”最后一切正常,创建和删除操作都成功,CloudFormation 自定义资源按预期完成。呼。在此发布代码,希望其他人能从中受益。
var aws = require("aws-sdk");
exports.handler = async function(event,callback) {
console.log("REQUEST RECEIVED:\n" + JSON.stringify(event));
var connect = new aws.Connect({region: event.ResourceProperties.Region});
var isInboundCallsEnabled = (process.env.InboundCallsEnabled == 'true');
var isOutboundCallsEnabled = (process.env.OutboundCallsEnabled == 'true');
var createInstanceParams = {
InboundCallsEnabled: isInboundCallsEnabled,OutboundCallsEnabled: isOutboundCallsEnabled,IdentityManagementType: process.env.IdentityManagementType,ClientToken: process.env.ClientToken,DirectoryId: process.env.DirectoryId,InstanceAlias: process.env.InstanceAlias
};
var responseStatus;
var responseData = {};
if (event.RequestType == "Create") {
try {
var createInstanceRequest = await connect.createInstance(createInstanceParams).promise();
responseStatus = "SUCCESS";
responseData = {"InstanceId": createInstanceRequest.Id};
} catch (err) {
responseStatus = "FAILED";
responseData = {Error: "CreateInstance failed"};
console.log(responseData.Error + ":\n",err);
}
return await sendResponse(event,responseData);
}
if (event.RequestType == "Delete") {
var instanceId;
var listInstanceRequest = await connect.listInstances({}).promise();
listInstanceRequest.InstanceSummaryList.forEach(instance => {
if (instance.InstanceAlias == createInstanceParams.InstanceAlias) {
instanceId = instance.Id;
}
});
if (instanceId !== undefined) {
try {
var deleteInstanceRequest = await connect.deleteInstance({"InstanceId": instanceId}).promise();
responseStatus = "SUCCESS";
responseData = {"InstanceId": instanceId};
} catch (err) {
responseStatus = "FAILED";
responseData = {Error: "DeleteInstance call failed"};
console.log(responseData.Error + ":\n",err);
}
} else {
responseStatus = "FAILED";
responseData = {Error: "DeleteInstance failed; no match found"};
console.log(responseData.Error);
}
return await sendResponse(event,responseData);
}
};
async function sendResponse(event,responseData) {
let responsePromise = new Promise((resolve,reject) => {
var responseBody = JSON.stringify({
Status: responseStatus,Reason: "CloudWatch Log Stream: " + context.logStreamName,PhysicalResourceId: context.logStreamName,StackId: event.StackId,RequestId: event.RequestId,LogicalResourceId: event.LogicalResourceId,Data: responseData
});
console.log("RESPONSE BODY:\n",responseBody);
var https = require("https");
var url = require("url");
var parsedUrl = url.parse(event.ResponseURL);
var options = {
hostname: parsedUrl.hostname,port: 443,path: parsedUrl.path,method: "PUT",headers: {
"content-type": "","content-length": responseBody.length
}
};
console.log("SENDING RESPONSE...\n");
var request = https.request(options,function(response) {
console.log("STATUS: " + response.statusCode);
console.log("HEADERS: " + JSON.stringify(response.headers));
resolve(JSON.parse(responseBody));
context.done();
});
request.on("error",function(error) {
console.log("sendResponse Error:" + error);
reject(error);
context.done();
});
request.write(responseBody);
request.end();
});
return await responsePromise;
}
,
此答案是针对在 CloudFormation 中的 "Code" property 的 AWS::Lambda::Function resource 中使用“ZipFile”选项的人的 OP 答案的变体。 ZipFile 方法的优势在于,除了允许将 Lambda 代码内联到 CF 模板之外,它还自动捆绑了一个“cfn-response.js”函数,非常类似于 OP 答案中的“异步函数 sendResponse”。通过从 OP 对承诺响应的回答中获得的洞察力(谢谢,我被卡住了并感到困惑),这就是我如何将 cfn-response 函数合并为一个可等待的承诺,以在我的异步 AWS API 调用后向 CF 发出信号(省略)为简洁起见)已完成:
CreateSnapshotFunction:
Type: AWS::Lambda::Function
Properties:
Runtime: nodejs12.x
Handler: index.handler
Timeout: 900 # 15 mins
Code:
ZipFile: !Sub |
const resp = require('cfn-response');
const aws = require('aws-sdk');
const cf = new aws.CloudFormation({apiVersion: '2010-05-15'});
const rds = new aws.RDS({apiVersion: '2014-10-31'});
exports.handler = async function(evt,ctx) {
if (evt.RequestType == "Create") {
try {
// Query the given CF stack,determine its database
// identifier,create a snapshot of the database,// and await an "available" status for the snapshot
let stack = await getStack(stackNameSrc);
let srcSnap = await createSnapshot(stack);
let pollFn = () => describeSnapshot(srcSnap.DBSnapshot.DBSnapshotIdentifier);
let continueFn = snap => snap.DBSnapshots[0].Status !== 'available';
await poll(pollFn,continueFn,10,89); // timeout after 14 min,50 sec
// Send response to CF
await send(evt,ctx,resp.SUCCESS,{
SnapshotId: srcSnap.DBSnapshot.DBSnapshotIdentifier,UpgradeRequired: upgradeRequired
});
} catch(err) {
await send(evt,resp.FAILED,{ ErrorMessage: err } );
}
} else {
// Send success to CF for delete and update requests
await send(evt,{});
}
};
function send(evt,status,data) {
return new Promise(() => { resp.send(evt,data) });
}
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。