微信公众号搜"智元新知"关注
微信扫一扫可直接关注哦!

HttpClient TimeOut 和 Polly Bulkhead Policy 问题 带背部和动作块带缓冲块带频道带阻塞集合使用并行 Foreach

如何解决HttpClient TimeOut 和 Polly Bulkhead Policy 问题 带背部和动作块带缓冲块带频道带阻塞集合使用并行 Foreach

我在使用 Polly Bulkhead 策略时遇到了许多超时异常,此策略帮助我限制发送到特定主机的并发调用数。但是似乎 HttpClient 超时时间会影响整个委托。

我正在使用 IHttpClientFactory 通过以下代码对其进行配置:

services.AddHttpClient(string.Empty)
.AddPolicyHandler(GetBulkheadPolicy(100));


private static IAsyncPolicy<HttpResponseMessage> GetBulkheadPolicy(int maxConcurrentRequests)
{
    return Policy.BulkheadAsync(maxConcurrentRequests,int.MaxValue)
        .AsAsyncPolicy<HttpResponseMessage>();
}

我的问题是我希望超时只影响请求本身而不影响隔板策略,因为我想要实现的行为如下:

  • 限制对特定主机的并发请求数
  • 无限等待直到有能力发送请求(当队列已满时,Polly 将引发异常)
  • 向主机发送请求并应用超时,例如认超时。

我已经使用 Semaphore 而不是 Bulkhead Polly 策略实现了该行为,但我想使用策略封装该代码

谢谢。

解决方法

我将这些示例放在一起以演示如何对 HttpClient 请求执行限制的不同选项。我必须强调,这些只是示例,与生产代码相差甚远,因此请通过玻璃仔细检查它们。

下面的示例代码展示了如何在火灾和遗忘中发出请求 方式(所以他们不关心回应)。解决方案假设请求数多于可用吞吐量。换句话说,生产者比消费者更快,这就是为什么有某种排队机制来处理这种不平衡。

带背部和动作块

public class ThrottlingWithBatchBlock
{
    static readonly HttpClient client = new();
    private readonly BatchBlock<HttpRequestMessage> requests = new(100);
    private ActionBlock<HttpRequestMessage[]> consumer;

    public ThrottlingWithBatchBlock()
    {
        consumer = new(
            reqs => ConsumerAsync(reqs),new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 100 });
        requests.LinkTo(consumer);
    }

    public async Task IssueNewRequest(HttpRequestMessage request)
    {
        await requests.SendAsync(request);
    }

    private async Task ConsumerAsync(HttpRequestMessage[] requests)
    {
        foreach (var request in requests)
            await client.SendAsync(request).ConfigureAwait(false);
    }
}

带缓冲块

public class ThrottlingWithBufferBlock
{
    static readonly HttpClient client = new();
    private readonly BufferBlock<HttpRequestMessage> requests = new(
            new DataflowBlockOptions { BoundedCapacity = 100 });

    public ThrottlingWithBufferBlock()
    {
        _ = ConsumerAsync();
    }

    public async Task IssueNewRequest(HttpRequestMessage request)
    {
        await requests.SendAsync(request);
    }

    async Task ConsumerAsync()
    {
        while (await requests.OutputAvailableAsync())
        {
            var request = await requests.ReceiveAsync();
            await client.SendAsync(request).ConfigureAwait(false);
        }
    }
}

带频道

public class ThrottlingWithChannels
{
    static readonly HttpClient client = new();
    private Channel<HttpRequestMessage> requests = Channel.CreateBounded<HttpRequestMessage>(
            new BoundedChannelOptions(100) { SingleWriter = true,SingleReader = false });

    public ThrottlingWithChannels()
    {
        _ = ConsumerAsync();
    }

    public async Task IssueNewRequest(HttpRequestMessage request)
    {
        await requests.Writer.WaitToWriteAsync();
        await requests.Writer.WriteAsync(request);
    }

    async Task ConsumerAsync()
    {
        while (await requests.Reader.WaitToReadAsync())
        {
            var request = await requests.Reader.ReadAsync();
            await client.SendAsync(request).ConfigureAwait(false);
        }
    }
}

带阻塞集合

public class ThrottlingWithBlockingCollection
{
    static readonly HttpClient client = new();
    private BlockingCollection<HttpRequestMessage> requests = new();

    public ThrottlingWithBlockingCollection()
    {
        _ = Enumerable.Range(1,100)
            .Select(_ => ConsumerAsync()).ToArray();
    }

    public Task IssueNewRequest(HttpRequestMessage request)
    {
        requests.Add(request);
        return Task.CompletedTask;
    }

    async Task ConsumerAsync()
    {
        while (true)
        {
            var request = requests.Take();
            await client.SendAsync(request).ConfigureAwait(false);
        }
    }
}

使用并行 Foreach

public class ThrottlingWithParallelForEach
{
    static readonly HttpClient client = new();
    private BlockingCollection<HttpRequestMessage> requests = new();

    public ThrottlingWithParallelForEach()
    {
        _ = requests.ParallelAsyncForEach(async request => await client.SendAsync(request).ConfigureAwait(false),100);
    }

    public Task IssueNewRequest(HttpRequestMessage request)
    {
        requests.Add(request);
        return Task.CompletedTask;
    }
}
//Based on https://codereview.stackexchange.com/a/203487
public static partial class ParallelForEach
{
    public static async Task ParallelAsyncForEach<T>(this IEnumerable<T> source,Func<T,Task> body,int degreeOfParallelism)
    {
        var toBeProcessedJobs = new HashSet<Task>();
        var remainingJobsEnumerator = source.GetEnumerator();

        void AddNewJob()
        {
            if (remainingJobsEnumerator.MoveNext())
            {
                var readyToProcessJob = body(remainingJobsEnumerator.Current);
                toBeProcessedJobs.Add(readyToProcessJob);
            }
        }

        while (toBeProcessedJobs.Count < degreeOfParallelism)
        {
            AddNewJob();
        }

        while (toBeProcessedJobs.Count > 0)
        {
            Task processed = await Task.WhenAny(toBeProcessedJobs).ConfigureAwait(false);
            toBeProcessedJobs.Remove(processed);
            AddNewJob();
        }

        return;
    }
}

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。