如何确保队列消息已在Azure Functions中成功处理?

如何解决如何确保队列消息已在Azure Functions中成功处理?

我有一个使用HTTP触发器和队列触发器构建的C#Azure Functions(根据App Service计划)应用程序。该应用程序通过在客户端计算机上安装脚本来工作,该脚本使用SQL查询将客户端数据库中的各种文件移至临时Azure Blob存储中,以从客户端数据库提取各种文件。每个文件完成后,将调用HTTP触发器,该触发器将为队列触发器创建队列消息,以拾取该消息并将文件从临时Blob存储区移至Blob存储区中的永久位置。 HTTP触发器完成并将消息放入队列后,执行将返回到客户端脚本以开始处理下一个SQL查询

我担心的是,当队列触发器实际上仍在工作或可能失败时,尤其是在并行处理多个客户端时,这些队列消息将堆积起来,并且客户端脚本将以错误的成功消息完成。在继续下一个SQL查询之前,是否有办法确保队列消息已成功处理?

编辑:添加代码示例

我可能有3个客户端,并且在其计算机上安装了应用程序,每个客户端都设置为在12AM执行这些脚本,并且由于它们托管在客户端计算机上,因此可以并行运行。 客户端脚本

// perform sql query to extract data from client database
// move extracted data to temporary Storage Blob hosted on the App Service storage account
return await httpClient.PostAsync(uri of the file in temporary blob storage)

文件准备好处理时,此前await会发布到HTTP。
Azure函数HTTP触发器

// get storage account credentials
// write message to storage queue "job-submissions'
return new OkResult();

现在,“职位提交”队列中有来自多个客户端的文件
Azure功能队列触发器

// pick up message from "job-submissions" queue
// use the Microsoft.Azure.Storage.Blob library to move files
// to a permanent spot in the data lake
// create Meta file with info about the file 
// Meta file contains info for when the extraction started and completed
// delete the temporary file
// job completed and the next queue message can be picked up  

所以问题是,当HTTP触发器将消息写入队列时,我无法得知队列已完成文件的处理。现在这不是什么大问题,因为该过程是如此迅速,以至于我在HTTP触发器中向队列中发送消息时,队列最多只需要几秒钟即可处理文件。我想知道各个作业何时完成的原因是,我在客户端脚本中有最后一步:
客户端脚本

// after all jobs for a client have been submitted by HTTP
// get storage account credentials
// write message to a queue "client-tasks-completed" 
// queue message contains client name in the message 
// initialVisibilityDelay set to 2 minutes 
// this ensures queue has finished processing the files

然后,一个单独的Python Azure函数在该队列上进行侦听以进行进一步处理:
Python QueueTrigger

# pick up message from "client-tasks-completed" queue
if 'client1' == queue_msg['ClientName']:
    # standardize information within the files and write to our Azure sql database
elif 'client2' == queue_msg['ClientName']:
    # standardize information within the files and write to our Azure sql database
elif 'client3' == queue_msg['ClientName']:
    # standardize information within the files and write to our Azure sql database

Python Azure函数处于消耗计划中,并且batchSize设置为1,因为客户端文件有时可能很大,并且我不想超过1.5GB的内存限制。所以我有两个问题,第一个是我如何知道第一个队列触发器已完成工作?第二个是,如何确保Python QueueTrigger不开始累积消息?我认为通过为侦听相同队列的两个队列触发器创建单独的Azure函数,可以潜在地解决这两个问题。这样可以减轻双方的负担,但是我不确定这是否是最佳实践。在这里我对问题2寻求更多指导的地方请参见我的问题:Using multiple Azure Functions QueueTriggers to listen on the same storage queue

解决方法

更新

using System;
using System.IO;
using System.Threading.Tasks;
using Microsoft.AspNetCore.Mvc;
using Microsoft.Azure.WebJobs;
using Microsoft.Azure.WebJobs.Extensions.Http;
using Microsoft.AspNetCore.Http;
using Microsoft.Extensions.Logging;
using Newtonsoft.Json;
using System.Threading;

namespace FunctionApp31
{
    public static class Function1
    {
        [FunctionName("Function1")]
        public static async Task<IActionResult> Run(
            [HttpTrigger(AuthorizationLevel.Function,"get","post",Route = null)] HttpRequest req,ILogger log)
        {

            string a = "111";

            a=XX(a).Result;

            return new OkObjectResult(a);
        }

        public static async Task<string> XX(string x)
        {
            await Task.Run(()=>{
                Thread.Sleep(3000);
                x = x + "222";
                Console.WriteLine(x);
                }
                );

            return x;
        } 
    }
}

原始答案:

我建议您顺序执行处理逻辑,而不是异步执行。或者,您可以等待异步操作完成后再返回,这样可以确保执行成功后再返回成功。(这样可以避免在队列仍按注释中所述处理时返回结果。)

我注意到您问了一个新问题。我认为您可以扩展实例,而不用创建多个功能的应用程序。 (当然,创建多个功能的应用程序没有问题。)如果您基于消耗计划,则实例将根据负载自动扩展。

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。

相关推荐


Selenium Web驱动程序和Java。元素在(x,y)点处不可单击。其他元素将获得点击?
Python-如何使用点“。” 访问字典成员?
Java 字符串是不可变的。到底是什么意思?
Java中的“ final”关键字如何工作?(我仍然可以修改对象。)
“loop:”在Java代码中。这是什么,为什么要编译?
java.lang.ClassNotFoundException:sun.jdbc.odbc.JdbcOdbcDriver发生异常。为什么?
这是用Java进行XML解析的最佳库。
Java的PriorityQueue的内置迭代器不会以任何特定顺序遍历数据结构。为什么?
如何在Java中聆听按键时移动图像。
Java“Program to an interface”。这是什么意思?
Java在半透明框架/面板/组件上重新绘画。
Java“ Class.forName()”和“ Class.forName()。newInstance()”之间有什么区别?
在此环境中不提供编译器。也许是在JRE而不是JDK上运行?
Java用相同的方法在一个类中实现两个接口。哪种接口方法被覆盖?
Java 什么是Runtime.getRuntime()。totalMemory()和freeMemory()?
java.library.path中的java.lang.UnsatisfiedLinkError否*****。dll
JavaFX“位置是必需的。” 即使在同一包装中
Java 导入两个具有相同名称的类。怎么处理?
Java 是否应该在HttpServletResponse.getOutputStream()/。getWriter()上调用.close()?
Java RegEx元字符(。)和普通点?