如何解决强行终止由 Parallel.ForEach
使用Parallel.ForEach()
时,有没有办法在特定线程上强制执行Thread.Abort
?
我知道不推荐使用 Thread.Abort()
。
我正在对数十万个实体的集合运行 Parallel.ForEach()
。
在某些情况下,循环处理可追溯到 30 年前的数据。我们遇到了一些线程挂起的问题。虽然我们试图掌握这一点,但希望将实施称为故障安全。如果线程运行超过 x 时间,强行终止线程。
我不想使用取消令牌。
这会很丑陋,但还没有找到其他解决方案。是否有可能:
- 让每个线程打开一个计时器。将
Thread.CurrentThread
的引用传递给计时器 - 如果计时器已过且处理尚未完成,请对该计时器调用
Thread.Abort
- 如果需要,信号事件等待句柄以允许下一个患者处理
private void ProcessEntity(Processparams param,ConcurrentDictionary<long,string> entities)
{
var options = new ParallelOptions
{
MaxDegreeOfParallelism = 2
};
Parallel.ForEach(person,options,p =>
{
Processperson(param,p);
});
}
internal void Processperson(Processparams param,keyvaluePair<long,string> p)
{
try
{
//...
}
catch (Exception ex)
{
}
param.eventWaitHandle?.WaitOne();
}
解决方法
似乎 Parallel.ForEach
方法在其工作线程被中止时没有弹性,并且行为不一致。其他时候传播一个包含 AggregateException
的 ThreadAbortException
,其他时候它直接抛出一个 ThreadAbortException
,一个丑陋的堆栈跟踪显示 its internals。
下面是一个自定义的 ForEachTimeoutAbort
方法,它提供了 Parallel.ForEach
的基本功能,没有取消、循环状态、自定义分区器等高级功能。它的特殊功能是 TimeSpan timeout
参数,这会中止任何需要很长时间才能完成的项目的工作线程。
public static void ForEachTimeoutAbort<TSource>(
this IEnumerable<TSource> source,Action<TSource> action,int maxDegreeOfParallelism,TimeSpan timeout)
{
// Arguments validation omitted
var semaphore = new SemaphoreSlim(maxDegreeOfParallelism,maxDegreeOfParallelism);
var exceptions = new ConcurrentQueue<Exception>();
try
{
foreach (var item in source)
{
semaphore.Wait();
if (!exceptions.IsEmpty) { semaphore.Release(); break; }
ThreadPool.QueueUserWorkItem(_ =>
{
var timer = new Timer(state => ((Thread)state).Abort(),Thread.CurrentThread,Timeout.Infinite,Timeout.Infinite);
try
{
timer.Change(timeout,Timeout.InfiniteTimeSpan);
action(item);
}
catch (Exception ex) { exceptions.Enqueue(ex); }
finally
{
using (var waitHandle = new ManualResetEvent(false))
{
timer.Dispose(waitHandle);
// Wait the timer's callback (if it's queued) to complete.
waitHandle.WaitOne();
}
semaphore.Release();
}
});
}
}
catch (Exception ex) { exceptions.Enqueue(ex); }
// Wait for all pending operations to complete
for (int i = 0; i < maxDegreeOfParallelism; i++) semaphore.Wait();
if (!exceptions.IsEmpty) throw new AggregateException(exceptions);
}
ThreadAbortException
的一个特点是它无法被捕获。所以为了防止并行循环过早完成,必须从catch
块中调用Thread.ResetAbort
方法。
用法示例:
ForEachTimeoutAbort(persons,p =>
{
try
{
ProcessPerson(param,p);
}
catch (ThreadAbortException)
{
Thread.ResetAbort();
}
},maxDegreeOfParallelism: 2,timeout: TimeSpan.FromSeconds(30));
.NET Framework 是唯一可以使用 ForEachTimeoutAbort
方法的平台。对于 .NET Core 和 .NET 5,可以尝试将其转换为 ForEachTimeoutInterrupt
,并将对 Abort
的调用替换为对 Interrupt
的调用。中断线程不如中止线程有效,因为它仅在线程处于等待/睡眠模式时有效。但在某些情况下可能就足够了。
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。