微信公众号搜"智元新知"关注
微信扫一扫可直接关注哦!

c# – 这是TPL Dataflow的工作吗?

我在不同的任务上运行一个非常典型的生产者/消费者模型.

Task1:从二进制文件读取byte []批次,并为每个字节数组集合启动一个新任务. (操作批量用于内存管理目的).

任务2-n:这些是工作任务,每个操作都是通过字节数组的传入集合(来自Tasks1)进行操作,并对字节数组进行序列化,按照某些条件进行排序,然后存储一个结果对象的集合(每个字节数组并行字典反序列化成这样的对象)

任务(n 1)我选择了一个并发字典,因为这个任务的任务是将并行字典中存储的那些集合按照与从Task1起源的顺序相同的顺序进行合并.我通过传递一个collectionID(对于Task1中的每个新集合都是int类型,并且递增)一直从Task1到此任务来实现.此任务基本检查下一个预期的collectionID是否已经存储在并发字典中,如果是,则将其添加到Final Queue并检查并发字典中的下一个集合.

现在,从我所看到的和我看过的视频来看,TPL Dataflow似乎是这种制作人/消费者模型的完美选择.我似乎无法设计出一个设计,因此我从来没有使用过TPL Dataflow.在吞吐量和延迟方面,这个库甚至可以完成任务?我目前在最终的集合中处理了250万字节的数组,从而处理每秒的对象. TPL数据流可以帮助简化吗?我对以下问题的答案特别感兴趣:TPL数据流可以在任务1中生成工作任务时保留收集批次的顺序,并在工作任务完成工作后重新合并它能优化事情吗?对于整个结构的剖析,我觉得由于旋转而浪费了很多时间,并且涉及太多并发收藏.

任何想法,想法?

解决方法

编辑:原来我错了. TransformBlock确实返回的项目与他们进入的顺序相同,即使它被配置为并行性.因为这样,我原来的答案中的代码完全没用,而是可以使用正常的TransformBlock.

原来的答案:

据我所知,.Net中只有一个并行构造支持按照他们进入的顺序返回处理的项目:PLINQ with AsOrdered().但是在我看来,PLINQ不适合你想要的.

另一方面,TPL Dataflow很适合,但是它并没有一个可以同时支持并行和返回项目的块(TransformBlock支持它们,而不是在同一时间).幸运的是,数据流块的设计考虑到可组合性,因此我们可以构建我们自己的块.

但首先,我们必须弄清楚如何排序结果.使用一个并行字典,就像你所建议的一样,以及一些同步机制,一定会奏效.但我认为有一个更简单的解决方案:使用任务队列.在输出任务中,您将出现一个Task,等待它完成(异步),当它发生时,会发送其结果.当队列为空时,我们仍然需要一些同步,但如果我们选择使用巧妙的队列,我们​​可以免费获得同步.

所以,一般的想法是这样的:我们正在写的是一个IPropagatorBlock,有一些输入和一些输出.创建自定义IPropagatorBlock的最简单方法是创建一个处理输入的块,另一个生成结果的块将其视为使用DataflowBlock.Encapsulate()处理的块.

输入块将必须以正确的顺序处理传入的项目,因此在那里没有并行化.它将创建一个新的任务(实际上是一个TaskCompletionSource,以便我们稍后可以设置任务的结果),将其添加到队列中,然后发送项目进行处理,以及一些方法来设置正确任务的结果.因为我们不需要链接这个块到任何东西,我们可以使用一个ActionBlock.

输出块将不得不从队列中取出任务,异步等待它们,然后发送它们.但是由于所有块都嵌入在其中,并且采用代理的块具有异步等待内置,这将非常简单:新的TransformBlock< Task& TOutput>,TOutput>(t => t).该块将同时用作队列和输出块.因此,我们不必处理任何同步.

最后一块拼图实际上是并行处理物品.为此,我们可以使用另一个ActionBlock,这次用MaxDegreeOfParallelism设置.它将采取输入,处理它,并将正确的任务的结果设置在队列中.

放在一起,可能看起来像这样:

public static IPropagatorBlock<TInput,TOutput>
    CreateConcurrentOrderedTransformBlock<TInput,TOutput>(
    Func<TInput,TOutput> transform)
{
    var queue = new TransformBlock<Task<TOutput>,TOutput>(t => t);

    var processor = new ActionBlock<Tuple<TInput,Action<TOutput>>>(
        tuple => tuple.Item2(transform(tuple.Item1)),new ExecutionDataflowBlockOptions
        {
            MaxDegreeOfParallelism = DataflowBlockOptions.Unbounded
        });

    var enqueuer = new ActionBlock<TInput>(
        async item =>
        {
            var tcs = new taskcompletionsource<TOutput>();
            await processor.SendAsync(
                new Tuple<TInput,Action<TOutput>>(item,tcs.SetResult));
            await queue.SendAsync(tcs.Task);
        });

    enqueuer.Completion.ContinueWith(
        _ =>
        {
            queue.Complete();
            processor.Complete();
        });

    return DataflowBlock.Encapsulate(enqueuer,queue);
}

经过这么多的谈话,我认为这是相当少量的代码.

看来你关心性能很多,所以你可能需要微调这个代码.例如,将处理器块的MaxDegreeOfParallelism设置为Environment.ProcessorCount可能是有意义的,以避免超额订阅.此外,如果延迟比您的吞吐量更重要,将相同块的MaxMessagesPerTask设置为1(或另一个小数)可能是有意义的,以便在处理项目完成时立即将其发送到输出.

此外,如果要限制收到的项目,您可以设置enqueer的BoundedCapacity.

原文地址:https://www.jb51.cc/csharp/95916.html

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。

相关推荐