如何解决增加并行API调用的延迟
我正在使用Polly进行并行API调用。但是服务器每秒不能处理超过25个呼叫,因此我想知道是否有办法在每25个呼叫之后添加1s延迟?
var policy = Policy
.Handle<HttpRequestException>()
.RetryAsync(3);
foreach (var mediaItem in uploadedMedia)
{
var mediaRequest = new HttpRequestMessage { *** }
async Task<string> func()
{
var response = await client.SendAsync(mediaRequest);
return await response.Content.ReadAsstringAsync();
}
tasks.Add(policy.ExecuteAsync(() => func()));
}
await Task.WhenAll(tasks);
foreach (var mediaItem in uploadedMedia.Items)
{
var mediaRequest = new HttpRequestMessage
{
RequestUri = new Uri($"https://u48ydao1w4.execute-api.ap-southeast-2.amazonaws.com/test/downloads/thumbnails/{mediaItem.filename.S}"),Method = HttpMethod.Get,Headers = {
{ "id-token",id_Token },{ "access-token",access_Token }
}
};
async Task<string> func()
{
if (count == 24)
{
Thread.Sleep(1000);
count = 0;
}
var response = await client.SendAsync(mediaRequest);
count++;
return await response.Content.ReadAsstringAsync();
}
tasks.Add(policy.ExecuteAsync(() => func()));
}
await Task.WhenAll(tasks);
foreach (var t in tasks)
{
var postResponse = await t;
urls.Add(postResponse);
}
解决方法
执行此操作的方法很多,但是编写一个简单的线程安全的可重复使用的异步速率限制器相当容易。
异步方法的优点是,它不会阻塞线程池线程,效率很高,并且可以在现有的异步工作流和管道(如TPL Dataflow和Reactive Extensions)中很好地工作。
示例
// 3 calls every 3 seconds as an example
var rateLimiter = new RateLimiter(3,TimeSpan.FromSeconds(3));
// create some work
var task1 = Task.Run(async () =>
{
for (var i = 0; i < 5; i++)
{
await rateLimiter.WaitAsync();
Console.WriteLine($"{Thread.CurrentThread.ManagedThreadId} : {DateTime.Now}");
}
}
);
var task2 = Task.Run(async () =>
{
for (var i = 0; i < 5; i++)
{
await rateLimiter.WaitAsync();
Console.WriteLine($"{Thread.CurrentThread.ManagedThreadId} : {DateTime.Now}");
}
}
);
await Task.WhenAll(task1,task2);
输出
4 : 10/25/2020 05:16:15
5 : 10/25/2020 05:16:15
4 : 10/25/2020 05:16:15
5 : 10/25/2020 05:16:18
5 : 10/25/2020 05:16:18
5 : 10/25/2020 05:16:18
5 : 10/25/2020 05:16:21
5 : 10/25/2020 05:16:21
5 : 10/25/2020 05:16:21
4 : 10/25/2020 05:16:24
用法
private RateLimiter _rateLimiter = new RateLimiter(25,TimeSpan.FromSeconds(1));
...
async Task<string> func()
{
await _rateLimiter.WaitAsync();
var response = await client.SendAsync(mediaRequest);
return await response.Content.ReadAsStringAsync();
}
课程
public class RateLimiter
{
private readonly CancellationToken _cancellationToken;
private readonly TimeSpan _timeSpan;
private bool _isProcessing;
private readonly int _count;
private readonly Queue<DateTime> _completed = new Queue<DateTime>();
private readonly Queue<TaskCompletionSource<bool>> _waiting = new Queue<TaskCompletionSource<bool>>();
private readonly object _sync = new object();
public RateLimiter(int count,TimeSpan timeSpan,CancellationToken cancellationToken = default)
{
_count = count;
_timeSpan = timeSpan;
_cancellationToken = cancellationToken;
}
private void Cleanup()
{
// if the cancellation was request,we need to throw on all waiting items
while (_cancellationToken.IsCancellationRequested && _waiting.Any())
if (_waiting.TryDequeue(out var item))
item.TrySetCanceled();
_waiting.Clear();
_completed.Clear();
_isProcessing = false;
}
private async Task ProcessAsync()
{
try
{
while (true)
{
_cancellationToken.ThrowIfCancellationRequested();
var time = DateTime.Now - _timeSpan;
lock (_sync)
{
// remove anything out of date from the queue
while (_completed.Any() && _completed.Peek() < time)
_completed.TryDequeue(out _);
// signal waiting tasks to process
while (_completed.Count < _count && _waiting.Any())
{
if (_waiting.TryDequeue(out var item))
item.TrySetResult(true);
_completed.Enqueue(DateTime.Now);
}
if (!_waiting.Any() && !_completed.Any())
{
Cleanup();
break;
}
}
var delay = (_completed.Peek() - time) + TimeSpan.FromMilliseconds(20);
if (delay.Ticks > 0)
await Task.Delay(delay,_cancellationToken);
Console.WriteLine(delay);
}
}
catch (OperationCanceledException)
{
lock (_sync)
Cleanup();
}
}
public ValueTask WaitAsync()
{
// ReSharper disable once InconsistentlySynchronizedField
_cancellationToken.ThrowIfCancellationRequested();
lock (_sync)
{
try
{
if (_completed.Count < _count && !_waiting.Any())
{
_completed.Enqueue(DateTime.Now);
return new ValueTask();
}
var tcs = new TaskCompletionSource<bool>();
_waiting.Enqueue(tcs);
return new ValueTask(tcs.Task);
}
finally
{
if (!_isProcessing)
{
_isProcessing = true;
_ = ProcessAsync();
}
}
}
}
}
注1 :最好在最大并行度下使用它。
注2 :尽管我对此进行了测试,但我只是将其作为一种新颖的解决方案写给了这个答案。
,只需快速浏览一下代码,也许另一个类似的解决方案就是添加Thread.Sleep(calculatedDelay):
foreach (var mediaItem in uploadedMedia.Items)
{
Thread.Sleep(calculatedDelay);
var mediaRequest = new HttpRequestMessage
计算出的延迟是基于1000/25的某个值。
但是,我觉得您需要比延迟某些指定值更好的解决方案,因为您不能确定传输数据时的开销延迟问题。另外,您也没有指出达到25+的限制时会发生什么,服务器将如何响应..您是否收到错误消息或处理得更优雅?也许在这里您可以找到更可靠的解决方案?
, Polly库当前lacks a rate-limiting policy(请求/时间)。幸运的是,使用SemaphoreSlim
可以轻松实现此功能。进行速率限制的技巧是在获取后将信号量的容量配置为等于被除数(请求),并将信号量的Release
延迟等于除数(时间)的时间范围信号量。这样,速率限制将始终应用于任何可能的时间窗口。
更新:我意识到了the Polly library is extensible,并允许实施具有自定义功能的自定义策略。因此,我放弃了最初的建议,转而支持下面的自定义RateLimitAsyncPolicy
类:
public class RateLimitAsyncPolicy : AsyncPolicy
{
private readonly SemaphoreSlim _semaphore;
private readonly TimeSpan _timeUnit;
public RateLimitAsyncPolicy(int maxOperationsPerTimeUnit,TimeSpan timeUnit)
{
// Arguments validation omitted
_semaphore = new SemaphoreSlim(maxOperationsPerTimeUnit);
_timeUnit = timeUnit;
}
protected async override Task<TResult> ImplementationAsync<TResult>(
Func<Context,CancellationToken,Task<TResult>> action,Context context,CancellationToken cancellationToken,bool continueOnCapturedContext)
{
await _semaphore.WaitAsync(cancellationToken)
.ConfigureAwait(continueOnCapturedContext);
ScheduleSemaphoreRelease();
return await action(context,cancellationToken).ConfigureAwait(false);
}
private async void ScheduleSemaphoreRelease()
{
await Task.Delay(_timeUnit);
_semaphore.Release();
}
}
此政策可确保在maxOperationsPerTimeUnit
大小的任何时间窗口内,仅开始个操作。不考虑操作的持续时间。换句话说,在任何给定时刻可以并发运行多少操作没有任何限制。 timeUnit
策略可以有选择地施加此限制。可以结合使用这两个策略(BulkheadAsync
和RateLimitAsyncPolicy
),如下例所示:
BulkheadAsync
该顺序仅对var policy = Policy.WrapAsync
(
Policy
.Handle<HttpRequestException>()
.RetryAsync(retryCount: 3),new RateLimitAsyncPolicy(
maxOperationsPerTimeUnit: 25,timeUnit: TimeSpan.FromSeconds(1)),Policy.BulkheadAsync( // Optional
maxParallelization: 25,maxQueuingActions: Int32.MaxValue)
);
策略很重要,出于documentation中所述的原因,该顺序必须放在第一位:
RetryAsync
:通常位于最里面,除非包装了最后的BulkheadPolicy
。当然在任何TimeoutPolicy
中。WaitAndRetry
有意限制并行化。您希望并行化专门用于运行委托,而不是等待重试。
类似地,Bulkhead
必须遵循RateLimitPolicy
,以便每次重试都被视为独立操作,并计入速率限制。
您应该使用Microsoft的Reactive Framework(又名Rx)-NuGet System.Reactive
并添加using System.Reactive.Linq;
-然后您可以执行以下操作:
HttpRequestMessage MakeMessage(MediaItem mi) => new HttpRequestMessage
{
RequestUri = new Uri($"https://u48ydao1w4.execute-api.ap-southeast-2.amazonaws.com/test/downloads/thumbnails/{mi.filename}"),Method = HttpMethod.Get,Headers = {
{ "id-token",id_Token },{ "access-token",access_Token }
}
};
var urls = await
uploadedMedia
.Items
.ToObservable()
.Buffer(24)
.Zip(Observable.Timer(TimeSpan.Zero,TimeSpan.FromSeconds(1.0)),(mrs,_) => mrs)
.SelectMany(mrs => mrs.ToObservable().SelectMany(mr => Observable.FromAsync(() => client.SendAsync(MakeMessage(mr)))))
.SelectMany(x => Observable.FromAsync(() => x.Content.ReadAsStringAsync()))
.ToList();
我无法对其进行测试,但它应该相当接近。
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。