如何解决如何对IEnumerable的所有元素执行Polly策略,并在出现第一个未处理的异常时停止
Polly库的策略,例如Bulkhead
,Retry
等,包含具有许多重载(18)的方法ExecuteAsync
,但是它们都不允许对IEnumerable
的所有元素执行策略并收集结果。似乎整个库都集中在执行单个动作的目标上,而将管理多个执行的责任留给了客户端代码。我想通过对所有Polly策略(IAsyncPolicy
接口的所有实现)实施扩展方法来解决此遗漏,并使用以下签名:
public static Task<TResult[]> ExecuteAsync<TSource,TResult>(
this IAsyncPolicy policy,IEnumerable<TSource> source,Func<TSource,Task<TResult>> action,bool continueOnCapturedContext = false,bool onErrorContinue = false)
continueOnCapturedContext
参数控制是否在捕获的同步上下文上继续,应该只传递
到原生ExecuteAsync
方法:
Task<TResult> IAsyncPolicy.ExecuteAsync<TResult>(
Func<CancellationToken,CancellationToken cancellationToken,bool continueOnCapturedContext);
onErrorContinue
参数是此问题的最重要方面,因为它控制了策略失败时的行为。我的意图是将这种扩展方法与数千个元素一起使用,并且在我的策略¹未预期/无法处理的任何例外情况下,我想迅速而优雅地终止整个执行过程。如果参数onErrorContinue
具有默认值false
,则第一个未处理的异常将导致取消所有挂起的操作,并且所有开始的操作完成后,整个执行应终止。在onErrorContinue: true
的相反情况下,所有元素应由策略处理。最后,应该传播所有异常,并将其捆绑在AggregateException
中,与onErrorContinue
值无关。
如何实现此扩展方法?
此方法的假设使用场景:
var policy = Policy
.BulkheadAsync(maxParallelization: 10,maxQueuingActions: Int32.MaxValue)
.WrapAsync(Policy
.Handle<HttpRequestException>()
.WaitAndRetryAsync(retryCount: 3,sleepDurationProvider: n => TimeSpan.FromMilliseconds(1000 * n))
);
var urls = Enumerable.Range(1,1000).Select(n => n.ToString());
var random = new Random(0);
string[] results = await policy.ExecuteAsync(urls,async url =>
{
await Task.Delay(500); // Simulate a web request
lock (random) if (random.NextDouble() < 0.66)
throw new HttpRequestException($"Url #{url} Failed");
return url;
},onErrorContinue: false);
¹这种情况很少在生产中发生,但可能在开发过程中频繁发生,并且可能会损害生产率。
解决方法
这是我对using System;
namespace ConsoleApp1
{
abstract class Vehicle
{
public abstract string Name { get; }
public override string ToString() => $"This vehicle is a {Name}.";
// To keep the output identical to your example,use this method
// public override string ToString() => $"We don't know what kind of vehicle this is.";
// You then have to override ToString in Sedan,too. But I think there's no reason
// why you'd want to have to override ToString everywhere with the same code.
}
internal class Sedan : Vehicle
{
public override string Name => "Sedan";
}
abstract class AbstractLoad
{
public abstract string Name { get; }
public override string ToString() => Name;
}
internal class Oil : AbstractLoad
{
public override string Name => "Oil";
}
internal class Lumber : AbstractLoad
{
public override string Name => "Lumber";
}
class Semi<TLoad> : Vehicle where TLoad : AbstractLoad
{
public override string Name => "Semi";
public TLoad Load { get; set; }
public override string ToString() => $"This vehicle is a semi carrying {Load}.";
}
class Program
{
static void Main(string[] args) {
var vehicle = new Semi<Oil> { Load = new Oil() };
DescribeVehicle(vehicle);
}
static void DescribeVehicle(Vehicle vehicle) {
Console.WriteLine(vehicle.ToString());
}
}
}
方法的实现。 ExecuteAsync
用于在发生异常的情况下取消挂起的操作。当有更重要的例外要传播时,CancellationTokenSource
可以很好地忽略Task.WhenAll
。最终,OperationCanceledException
任务没有进行Task.WhenAll
返回,以保留所有异常。
await
模拟public static Task<TResult[]> ExecuteAsync<TSource,TResult>(
this IAsyncPolicy policy,IEnumerable<TSource> source,Func<TSource,Task<TResult>> action,bool continueOnCapturedContext = false,bool onErrorContinue = false)
{
// Arguments validation omitted
var cts = new CancellationTokenSource();
var token = !onErrorContinue ? cts.Token : default;
var tasks = source.Select(async (item) =>
{
try
{
return await policy.ExecuteAsync(async _ =>
{
return await action(item);
},token,continueOnCapturedContext);
}
catch
{
if (!onErrorContinue) cts.Cancel();
throw;
}
}).ToArray();
var whenAll = Task.WhenAll(tasks);
_ = whenAll.ContinueWith(_ => cts.Dispose(),TaskScheduler.Default);
return whenAll;
}
行为(在这种情况下是理想的),否则为is quite tricky(通过异步/等待)。因此,我很高兴通过使用小型dirty Task.WhenAll
来避免这种麻烦,以便最终dispose ContinueWith
。
here介绍了处理多个异常的另一种方法。此解决方案传播了一个嵌套的CancellationTokenSource
,听起来很丑陋,但实际上是可以的,因为AggregateException
使用异步方法无论如何都消除了一层嵌套。
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。