如何解决如何确保队列消息已在Azure Functions中成功处理?
我有一个使用HTTP触发器和队列触发器构建的C#Azure Functions(根据App Service计划)应用程序。该应用程序通过在客户端计算机上安装脚本来工作,该脚本使用SQL查询将客户端数据库中的各种文件移至临时Azure Blob存储中,以从客户端数据库中提取各种文件。每个文件完成后,将调用HTTP触发器,该触发器将为队列触发器创建队列消息,以拾取该消息并将文件从临时Blob存储区移至Blob存储区中的永久位置。 HTTP触发器完成并将消息放入队列后,执行将返回到客户端脚本以开始处理下一个SQL查询。
我担心的是,当队列触发器实际上仍在工作或可能失败时,尤其是在并行处理多个客户端时,这些队列消息将堆积起来,并且客户端脚本将以错误的成功消息完成。在继续下一个SQL查询之前,是否有办法确保队列消息已成功处理?
我可能有3个客户端,并且在其计算机上安装了应用程序,每个客户端都设置为在12AM执行这些脚本,并且由于它们托管在客户端计算机上,因此可以并行运行。 客户端脚本
// perform sql query to extract data from client database
// move extracted data to temporary Storage Blob hosted on the App Service storage account
return await httpClient.PostAsync(uri of the file in temporary blob storage)
当文件准备好处理时,此前await
会发布到HTTP。
Azure函数HTTP触发器
// get storage account credentials
// write message to storage queue "job-submissions'
return new OkResult();
现在,“职位提交”队列中有来自多个客户端的文件。
Azure功能队列触发器
// pick up message from "job-submissions" queue
// use the Microsoft.Azure.Storage.Blob library to move files
// to a permanent spot in the data lake
// create Meta file with info about the file
// Meta file contains info for when the extraction started and completed
// delete the temporary file
// job completed and the next queue message can be picked up
所以问题是,当HTTP触发器将消息写入队列时,我无法得知队列已完成文件的处理。现在这不是什么大问题,因为该过程是如此迅速,以至于我在HTTP触发器中向队列中发送消息时,队列最多只需要几秒钟即可处理文件。我想知道各个作业何时完成的原因是,我在客户端脚本中有最后一步:
客户端脚本
// after all jobs for a client have been submitted by HTTP
// get storage account credentials
// write message to a queue "client-tasks-completed"
// queue message contains client name in the message
// initialVisibilityDelay set to 2 minutes
// this ensures queue has finished processing the files
然后,一个单独的Python Azure函数在该队列上进行侦听以进行进一步处理:
Python QueueTrigger
# pick up message from "client-tasks-completed" queue
if 'client1' == queue_msg['ClientName']:
# standardize information within the files and write to our Azure sql database
elif 'client2' == queue_msg['ClientName']:
# standardize information within the files and write to our Azure sql database
elif 'client3' == queue_msg['ClientName']:
# standardize information within the files and write to our Azure sql database
Python Azure函数处于消耗计划中,并且batchSize
设置为1
,因为客户端文件有时可能很大,并且我不想超过1.5GB的内存限制。所以我有两个问题,第一个是我如何知道第一个队列触发器已完成工作?第二个是,如何确保Python QueueTrigger不开始累积消息?我认为通过为侦听相同队列的两个队列触发器创建单独的Azure函数,可以潜在地解决这两个问题。这样可以减轻双方的负担,但是我不确定这是否是最佳实践。在这里我对问题2寻求更多指导的地方请参见我的问题:Using multiple Azure Functions QueueTriggers to listen on the same storage queue
解决方法
更新:
using System;
using System.IO;
using System.Threading.Tasks;
using Microsoft.AspNetCore.Mvc;
using Microsoft.Azure.WebJobs;
using Microsoft.Azure.WebJobs.Extensions.Http;
using Microsoft.AspNetCore.Http;
using Microsoft.Extensions.Logging;
using Newtonsoft.Json;
using System.Threading;
namespace FunctionApp31
{
public static class Function1
{
[FunctionName("Function1")]
public static async Task<IActionResult> Run(
[HttpTrigger(AuthorizationLevel.Function,"get","post",Route = null)] HttpRequest req,ILogger log)
{
string a = "111";
a=XX(a).Result;
return new OkObjectResult(a);
}
public static async Task<string> XX(string x)
{
await Task.Run(()=>{
Thread.Sleep(3000);
x = x + "222";
Console.WriteLine(x);
}
);
return x;
}
}
}
原始答案:
我建议您顺序执行处理逻辑,而不是异步执行。或者,您可以等待异步操作完成后再返回,这样可以确保执行成功后再返回成功。(这样可以避免在队列仍按注释中所述处理时返回结果。)
我注意到您问了一个新问题。我认为您可以扩展实例,而不用创建多个功能的应用程序。 (当然,创建多个功能的应用程序没有问题。)如果您基于消耗计划,则实例将根据负载自动扩展。
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。