微信公众号搜"智元新知"关注
微信扫一扫可直接关注哦!

c# – TPL架构问题

我目前正在开展一个项目,我们面临着并行处理项目的挑战.到目前为止没什么大不了的;)
现在来问题了.我们有一个ID列表,我们定期(每2秒)为每个ID调用StoredProcedure.
每个项目需要单独检查2秒,因为它们是在运行时添加删除的.
此外,我们希望配置最大并行度,因为DB不应同时充满300个线程.
正在处理的项目在完成上一次执行之前不应重新安排进行处理.原因是我们想要防止排队很多项目,以防数据库出现延迟.

现在我们正在使用一个自行开发的组件,它有一个主线程,定期检查需要安排处理的项目.一旦它有了列表,它就会删除那些基于自定义IOCP的线程池,然后使用等待句柄等待正在处理的项目.然后下一次迭代开始. IOCP因为它提供的工作窃取.

我想用TPL / .NET 4版本替换这个自定义实现,我想知道你将如何解决它(理想的简单和易读/可维护).
我知道这篇文章http://msdn.microsoft.com/en-us/library/ee789351.aspx,但它只是限制了使用的线程数量.叶子偷工作,定期执行物品….

理想情况下,它将成为一个通用组件,可用于需要定期为项目列表完成的所有任务.

欢迎任何投入,
TIA
马丁

解决方法

我认为你真的不需要为此直接使用TPL Tasks.对于初学者,我会在 ConcurrentQueue(认值)附近设置 BlockingCollection,而在BlockingCollection上没有设置 BoundedCapacity来存储需要处理的ID.
// Setup the blocking collection somewhere when your process starts up (OnStart for a Windows service)
BlockingCollection<string> idsToProcess = new BlockingCollection<string>();

从那里我将使用BlockingCollection::GetConsumingEnumerableBlockingCollection::GetConsumingEnumerable返回的枚举.在ForEach调用中,您将设置ParallelOptions::MaxDegreeOfParallelism在ForEach的主体内,您将执行存储过程.

现在,一旦存储过程执行完成,您就说您不想重新安排执行至少两秒钟.没问题,安排一个带有回调的System.Threading.Timer,它只会将ID添加回提供的回调中的BlockingCollection.

Parallel.ForEach(
    idsToProcess.GetConsumingEnumerable(),new ParallelOptions 
    { 
        MaxDegreeOfParallelism = 4 // read this from config
    },(id) =>
    {
       // ... execute sproc ...

       // Need to declare/assign this before the delegate so that we can dispose of it inside 
       Timer timer = null;

       timer = new Timer(
           _ =>
           {
               // Add the id back to the collection so it will be processed again
               idsToProcess.Add(id);

               // Cleanup the timer
               timer.dispose();
           },null,// no state,id wee need is "captured" in the anonymous delegate
           2000,// probably should read this from config
           Timeout.Infinite);
    }

最后,当进程关闭时,您将调用BlockingCollection::CompleteAdding,以便使用停止阻塞和完成处理可枚举,并且Parallel :: ForEach将退出.例如,如果这是Windows服务,您将在OnStop执行此操作.

// When ready to shutdown you just signal you're done adding
idsToProcess.CompleteAdding();

更新

您在评论中提出了一个有效的问题,即您可能在任何给定点处理大量ID,并担心每个ID的计时器会产生过多的开销.我绝对同意这一点.因此,在您同时处理大量ID的情况下,我将从使用每个ID的定时器更改为使用另一个队列来保存由单个短间隔计时器监视的“休眠”ID.首先,你需要一个ConcurrentQueue来放置睡着的ID:

ConcurrentQueue<Tuple<string,DateTime>> sleepingIds = new ConcurrentQueue<Tuple<string,DateTime>>();

现在,我在这里使用两部分Tuple进行说明,但您可能希望为它创建一个更强类型的结构(或者至少使用using语句将其替换)以提高可读性.元组具有id和DateTime,表示它何时被放入队列.

现在,您还需要设置将监视此队列的计时器:

Timer wakeSleepingIdsTimer = new Timer(
   _ =>
   {
       DateTime utcNow = DateTime.UtcNow;

       // Pull all items from the sleeping queue that have been there for at least 2 seconds
       foreach(string id in sleepingIds.TakeWhile(entry => (utcNow - entry.Item2).TotalSeconds >= 2))
       {
           // Add this id back to the processing queue
           idsToProcess.Enqueue(id);
       }
   },// no state
   Timeout.Infinite,// no due time
   100 // wake up every 100ms,probably should read this from config
 );

然后你只需要改变Parallel :: ForEach来执行以下操作,而不是为每个设置一个计时器:

(id) =>
{
       // ... execute sproc ...

       sleepingIds.Enqueue(Tuple.Create(id,DateTime.UtcNow)); 
}

原文地址:https://www.jb51.cc/csharp/243161.html

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。

相关推荐