如何解决Parallel.ForEach MaxDegreeOfParallelism 增加“分块”的奇怪行为
我不确定标题是否有意义,这是我能想到的最好的,所以这是我的场景。
我有一个 ASP.NET Core 应用程序,我更多地将它用作外壳程序和 DI 配置。在 Startup
中,它添加了一堆 IHostedService
作为单例,以及它们的依赖项,也作为单例,除了 sqlConnection
和 DbContext
的小例外,我们将了解它们之后。托管服务是一组类似的服务,它们:
- 侦听来自 GPS 设备的传入报告并将其放入侦听缓冲区。
- 从侦听缓冲区中解析项目并将其放入已解析的缓冲区中。
最终有一个服务读取解析的缓冲区并实际处理解析的报告。它通过将它从缓冲区中取出的报告传递给处理程序并等待它完成以移动到下一个来完成此操作。这在过去一年运行良好,但现在看来我们遇到了可扩展性问题,因为它一次处理一个报告,并且服务器上的平均处理时间为 62 毫秒,其中包括 Dapper 访问数据库以获取所需的数据以及保存更改的 EF Core 行程。
然而,如果处理程序决定报告的信息需要触发后台作业,那么我怀疑它需要 100 毫秒或更长时间才能完成。随着时间的推移,缓冲区填满的速度比处理程序可以处理的速度快,以至于在处理它们之前要保持 10 甚至 100 万份报告。这是一个问题,因为通知会延迟,并且如果在服务器午夜重新启动时缓冲区仍然已满,则可能会丢失数据。
说了这么多,我正试图弄清楚如何使处理并行。经过昨天的大量实验,我决定在缓冲区上使用 Parallel.ForEach
使用 GetConsumingEnumerable()
。这很有效,除了我不知道该怎么办甚至打电话的奇怪行为。随着缓冲区被填满并且 ForEach
对其进行迭代,它将开始将处理“分块”成不断增加的 2 的倍数。分块的大小受 MaxDegreeOfParallelism
设置的影响。例如(N# = 缓冲区中的下一个报告数量):
MDP = 1
- 一次 N3 = 1
- N6 = 2 次li>
- N12 = 4 次
- ...
MDP = 2
- N6 = 1 次
- N12 = 2 次li>
- N24 = 4 次
- ...
MDP = 4
- N12 = 1 次
- N24 = 2 次li>
- N48 = 4 次
- ...
MDP = 8(我的 cpu 核心数)
- N24 = 1 次
- N48 = 2 次li>
- N96 = 4 次
- ...
这可以说比我现在的串行执行更糟糕,因为到一天结束时,它会在实际处理之前缓冲并等待,比如说,50 万份报告。
有没有办法解决这个问题?我对 Parallel.ForEach
不是很有经验,所以从我的角度来看,这是一种奇怪的行为。最终,我正在寻找一种方法来在报告进入缓冲区后立即对其进行并行处理,因此,如果有其他方法可以实现这一点,我会全力以赴。这大致是我对代码的了解。处理报告的处理程序确实使用 IServiceProvider
创建范围并获取 sqlConnection
和 DbContext
的实例。提前感谢您的任何建议!
public sealed class GpsReportService :
IHostedService {
private readonly GpsReportBuffer _buffer;
private readonly Config _config;
private readonly GpsReportHandler _handler;
private readonly ILogger _logger;
public GpsReportService(
GpsReportBuffer buffer,Config config,GpsReportHandler handler,ILogger<GpsReportService> logger) {
_buffer = buffer;
_config = config;
_handler = handler;
_logger = logger;
}
public Task StartAsync(
CancellationToken cancellationToken) {
_logger.Loginformation("GPS Report Service => starting");
Task.Run(Process,cancellationToken).ConfigureAwait(false);// Is ConfigureAwait here correct usage?
_logger.Loginformation("GPS Report Service => started");
return Task.CompletedTask;
}
public Task StopAsync(
CancellationToken cancellationToken) {
_logger.Loginformation("GPS Parsing Service => stopping");
_buffer.CompleteAdding();
_logger.Loginformation("GPS Parsing Service => stopped");
return Task.CompletedTask;
}
// ========================================================================
// Utilities
// ========================================================================
private void Process() {
var options = new ParallelOptions {
MaxDegreeOfParallelism = 8,CancellationToken = CancellationToken.None
};
Parallel.ForEach(_buffer.GetConsumingEnumerable(),options,async report => {
try {
await _handler.ProcessAsync(report).ConfigureAwait(false);
} catch (Exception e) {
if (_config.IsDevelopment) {
throw;
}
_logger.LogError(e,"GPS Report Service");
}
});
}
private async Task ProcessAsync() {
while (!_buffer.IsCompleted) {
try {
var took = _buffer.TryTake(out var report,10);
if (!took) {
continue;
}
await _handler.ProcessAsync(report!).ConfigureAwait(false);
} catch (Exception e) {
if (_config.IsDevelopment) {
throw;
}
_logger.LogError(e,"GPS Report Service");
}
}
}
}
public sealed class GpsReportBuffer :
BlockingCollection<GpsReport> {
}
解决方法
您不能将 Parallel
方法与 async
委托一起使用 - 至少现在还不能。
由于您已经拥有“流水线”风格的架构,我建议您查看 TPL Dataflow。单个 ActionBlock
可能就是您所需要的,一旦您开始工作,TPL Dataflow 中的其他块可能会替换您管道的其他部分。
如果您更喜欢使用现有的缓冲区,那么您应该使用异步并发而不是 Parallel
:
private void Process() {
var throttler = new SemaphoreSlim(8);
var tasks = _buffer.GetConsumingEnumerable()
.Select(async report =>
{
await throttler.WaitAsync();
try {
await _handler.ProcessAsync(report).ConfigureAwait(false);
} catch (Exception e) {
if (_config.IsDevelopment) {
throw;
}
_logger.LogError(e,"GPS Report Service");
}
finally {
throttler.Release();
}
})
.ToList();
await Task.WhenAll(tasks);
}
,
您遇到的是事件流处理/数据流问题,而不是并行性问题。如果您使用适当的类,例如 Dataflow 块、Channels 或 Reactive Extensions,问题将被简化lot。
即使您想使用单个缓冲区和胖工作者方法,合适的缓冲区类是异步 Channel,而不是 BlockingCollection。代码可以变得如此简单:
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
await foreach(GpsMessage msg in _reader.ReadAllAsync(stopppingToken))
{
await _handler.ProcessAsync(msg);
}
}
第一个选项展示了如何使用 Dataflow 创建管道。二、如何用Channel
代替BlockingCollection
并发处理多个排队的item
使用 Dataflow 的管道
一旦您将流程分解为独立的方法,就可以使用任何库轻松创建处理步骤的管道。
Task<IEnumerable<GpsMessage>> Poller(DateTime time,IList<Device> devices,CancellationToken token=default)
{
foreach(var device in devices)
{
if(token.IsCancellationRequested)
{
break;
}
var msg=await device.ReadMessage();
yield return msg;
}
}
GpsReport Parser(GpsMessage msg)
{
//Do some parsing magic.
return report;
}
async Task<GpsReport> Enrich(GpsReport report,string connectionString,CancellationToken token=default)
{
//Depend on connection pooling to eliminate the cost of connections
//We may have to use a pool of opened connections otherwise
using var con=new SqlConnection(connectionString);
var extraData=await con.QueryAsync<Extra>(sql,new {deviceId=report.DeviceId},token);
report.Extra=extraData;
return report;
}
async Task BulkImport(SqlReport[] reports,CancellationToken token=default)
{
using var bcp=new SqlBulkCopy(...);
using var reader=ObjectReader.Create(reports);
...
await bcp.WriteToServerAsync(reader,token);
}
在 BulkImport 方法中,我使用 FasMember's ObjectReader 在报告上创建 IDataReader 包装器,以便我可以将它们与 SqlBulkCopy 一起使用。另一种选择是将它们转换为 DataTable,但这会在内存中创建一个额外的数据副本。
将所有这些与 Dataflow 结合起来相对容易。
var execOptions=new ExecutionDataflowBlockOptions
{
MaxDegreeOfParallelism = 10
}
_poller = new TransformManyBlock<DateTime,GpsBuffer>(time=>Poller(time,devices));
_parser = new TransformBlock<GpsBuffer,GpsReport>(b=>Parser(b),execOptions);
var enricher = new TransformBlock<GpsReport,GpsReport>(rpt=>Enrich(rpt,connStr),execOptions);
_batch = new BatchBlock<GpsReport>(50);
_bcpBlock = new ActionBlock<GpsReport[]>(reports=>BulkImport(reports));
每个块都有一个输入和输出缓冲区(ActionBlock 除外)。每个块负责处理其输入缓冲区中的消息并对其进行处理。默认情况下,每个块只使用一个工作任务,但可以更改。消息顺序是保持不变的,因此如果我们为 parser
块使用例如 10 个工作任务,消息仍将按照收到的顺序发出。
接下来是链接块。
var linkOptions=new DataflowLinkOptions {PropagateCompletion=true};
_poller.LinkTo(_parser,options);
_parser.LinkTo(_enricher,options);
_enricher.LinkTo(_batch,options);
_batch.LinkTo(_bcpBlock,options);
之后,我们可以随时使用计时器来“ping”头块,即轮询器:
private void Ping(object state)
{
_poller.Post(DateTime.Now);
}
public Task StartAsync(CancellationToken stoppingToken)
{
_logger.LogInformation("Timed Hosted Service running.");
_timer = new Timer(Ping,null,TimeSpan.Zero,TimeSpan.FromSeconds(5));
return Task.CompletedTask;
}
为了优雅地停止管道,我们在头块上调用 Complete()
并在最后一个块上等待 Completion
任务。假设托管服务类似于 timed background service example:
public Task StopAsync(CancellationToken cancellationToken)
{
....
_timer?.Change(Timeout.Infinite,0);
_poller.Complete();
await _bcpBlock.Completion;
...
}
使用通道作为异步队列
对于异步发布者/订阅者场景,Channel 是比 BlockingCollection 更好的替代方案。粗略地说,它是一个异步队列,通过强制调用者使用 ChannelWriter 和 ChannelReader 类来极端地阻止发布者读取或订阅者写入。事实上,通常只传递这些类,而不是 Channel 实例本身。
在您的发布代码中,您可以创建一个 Channel<T>
并将其阅读器传递给 GpsReportService
服务。让我们假设发布者是另一个实现 IGpsPublisher
接口的服务:
public interface IGpsPublisher
{
ChannelReader<GspMessage> Reader{get;}
}
和实现
Channel<GpsMessage> _channel=Channel.CreateUnbounded<GpsMessage>();
public ChannelReader<GspMessage> Reader=>_channel;
private async void Ping(object state)
{
foreach(var device in devices)
{
if(token.IsCancellationRequested)
{
break;
}
var msg=await device.ReadMessage();
await _channel.Writer.WriteAsync(msg);
}
}
public Task StartAsync(CancellationToken stoppingToken)
{
_timer = new Timer(Ping,TimeSpan.FromSeconds(5));
return Task.CompletedTask;
}
public Task StopAsync(CancellationToken cancellationToken)
{
_timer?.Change(Timeout.Infinite,0);
_channel.Writer.Complete();
}
这可以作为依赖项传递给 GpsReportService,由 DI 容器解析:
public sealed class GpsReportService : BackgroundService
{
private readonly ChannelReader<GpsMessage> _reader;
public GpsReportService(
IGpsPublisher publisher,...)
{
_reader = publisher.Reader;
...
}
并使用
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
await foreach(GpsMessage msg in _reader.ReadAllAsync(stopppingToken))
{
await _handler.ProcessAsync(msg);
}
}
一旦发布者完成,订阅者循环也会在处理完所有消息后完成。
要并行处理,可以同时启动多个循环:
async Task Process(ChannelReader<GgpsMessage> reader,CancellationToken token)
{
await foreach(GpsMessage msg in reader.ReadAllAsync(token))
{
await _handler.ProcessAsync(msg);
}
}
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
var tasks=Enumerable.Range(0,10)
.Select(_=>ProcessReader(_reader,stoppingToken))
.ToArray();
await Task.WhenAll(tasks);
}
解释管道
我有类似的情况:每15分钟我向航空公司索取机票销售报告(实际上是GDSs),解析它们以提取数据和票号,下载每张机票的机票记录以获取一些额外数据并将所有内容保存到数据库中。我必须为 20 多个城市(每个城市的票务报告)执行此操作,每个报告都有 10 到 10 万多张票。
这几乎需要一个管道。使用您的示例,您可以使用以下步骤/块创建管道:
- 侦听 GPS 消息并发出未解析消息。
- 解析消息并发出解析后的消息
- 加载每条消息所需的任何额外数据并发出组合记录
- 处理组合记录并发出结果
- (可选)批处理结果
- 将结果保存到数据库
所有三个选项(Dataflow、Channels、Rx)都负责步骤之间的缓冲。 Dataflow 是用于处理独立事件的管道的一些组装所需的库,Rx 是现成的,用于分析时间很重要的事件流(例如计算滑动窗口中的平均速度),Channels 是乐高积木,除了需要放在一起。
为什么不是 Parallel.ForEach
Parallel.ForEach
用于数据并行,而不是异步操作。它旨在处理大块的内存数据,彼此独立。 Amdah's Law 解释说并行化的好处受到操作的同步部分的限制,因此所有数据并行性库都试图通过分区来减少这种情况,并使用一个核心/机器/节点来处理每个分区.
Parallel.ForEach
还通过对数据进行分区并为每个 CPU 内核使用大约一个工作任务来减少内核之间的同步。它甚至会使用当前线程,这会导致错误地假设它正在阻塞。当所有内核都忙时,为什么不使用线程?它无论如何都无法运行。
Parallel.ForEach
默认采用块分区,旨在减少 CPU 密集型应用程序中的同步开销,但在某些使用场景中可能会导致出现问题行为。可以通过将 Partitioner<T>
而不是 IEnumerable<T>
作为参数传递来禁用块分区:
Parallel.ForEach(Partitioner.Create(_buffer.GetConsumingEnumerable(),EnumerablePartitionerOptions.NoBuffering),options,...
您还可以在本文中找到专门为 BlockingCollection<T>
量身定制的自定义分区器:ParallelExtensionsExtras Tour – #4 – BlockingCollectionExtensions
也就是说,Parallel.ForEach
不是异步友好的,这意味着它不理解异步委托。传递的 lambda 是 async void
,也就是 something to avoid。所以我建议改用 ActionBlock<T>
。
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。