BlockingCollection 中的多个消费者是否同时处理?

如何解决BlockingCollection 中的多个消费者是否同时处理?

我有一个 BlockingCollection,我从一个线程写入并从另一个线程读取。 生产者线程获取从服务器接收的项目并将它们添加到 BlockingCollection,而读取线程尝试清空 BlockingCollection 并处理它们。

问题我想批量清空队列,因为一一处理会太慢。但是当它不断被写入(数千个项目)时,消费者线程会一直读取它们直到它被清空,这意味着在写入完成之前处理甚至不会开始。

现在,消费者中的处理可以并行完成,所以我一直在想如何去做。

目前我有两个想法:

  1. 从消费者的 BlockingCollection 中读取一定数量的项目后,启动一个新的并行作业来处理它们,而不是等待完全清空队列然后开始处理。

  2. 使用多个使用者,并希望它们并行运行,而不是在尝试同时读取 BlockingCollection 时不断相互阻塞。

所以我的问题是关于选项 2 - BlockingCollection 是否针对这种情况进行了内部优化?它会划分读取的区域,还是消费者会为每个项目而争吵?如果是这样,那么选项 1 是否更好?

解决方法

添加另一种选择:(绝不是生产就绪的!)

这利用了 TPL 的数据流,其中 BatchBlock<T> 为我们抽象了批处理。

using System;
using System.Threading.Tasks;
using System.Threading.Tasks.Dataflow;

public class HoneyBatcher
{
    private const int BATCHSIZE = 10; // Find the size that works best for you.
    private readonly BatchBlock<Honey> batchBlock;
    private readonly ExecutionDataflowBlockOptions _options = 
                     new ExecutionDataflowBlockOptions()
    {
         // I'd start with 1,then benchmark if higher number actually benefits.
         MaxDegreeOfParallelism = 1,SingleProducerConstrained = true // if so,may micro-optimize throughput
    };
                       // vv Whatever process you want done on a batch
    public HoneyBatcher( Action<Honey[]> batchProcessor )
    {
        // BatchBlock does the batching
        // and is the entrypoint to the pipline.
        batchBlock = new BatchBlock<Honey>(BATCHSIZE);
        // processorBlock processes each batch that batchBlock will produce
        // Parallel executions as well as other tweaks can be configured through options.
        ActionBlock<Honey[]> processorBlock = 
                             new ActionBlock<Honey[]>(batchProcessor,_options);
        // build the pipline
        batchBlock.LinkTo(processorBlock);
        // item => batchBlock => item[BATCHSIZE] => batchProcessor(item[])
    }

    // Add item individually and have them batched up
    // and processed in a pipeline.
    public Task<bool> ProcessAsync(Honey item)
    {
        return batchBlock.SendAsync(item);
        // Can also be done with sync API.
    }
}

public class Honey 
{
    // Just a dummy
}

请注意,上面的代码片段只是想法的粗略布局。在生产中,您当然会解决错误处理、完成等问题。

,

批量处理项目的自然方法是将它们分批插入BlockingCollection 中,而不是稍后分批尝试删除 .换句话说,您可以使用 BlockingCollection<T[]> 而不是 BlockingCollection<T>。生产者线程可以使用 Queue<T>:

轻松完成批处理
var queue = new Queue<T>;
while (someCondition)
{
    var item = ProduceItem();
    queue.Enqueue(item);
    if (queue.Count == batchSize)
    {
        blockingCollection.Add(queue.ToArray());
        queue.Clear();
    }
}
if (queue.Count > 0)
{
    blockingCollection.Add(queue.ToArray());
    queue.Clear();
}
blockingCollection.CompleteAdding();

根据情况,您还可以使用一些 LINQ 风格的运算符,例如 Batch 库中的 MoreLinq

最后,回答您的主要问题,是的,BlockingCollection 类可以出色地处理多个消费者和多个生产者。如果集合为空,所有消费者都会被阻塞,当一个项目到达时,它会被交给一个等待的消费者。

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。

相关推荐


Selenium Web驱动程序和Java。元素在(x,y)点处不可单击。其他元素将获得点击?
Python-如何使用点“。” 访问字典成员?
Java 字符串是不可变的。到底是什么意思?
Java中的“ final”关键字如何工作?(我仍然可以修改对象。)
“loop:”在Java代码中。这是什么,为什么要编译?
java.lang.ClassNotFoundException:sun.jdbc.odbc.JdbcOdbcDriver发生异常。为什么?
这是用Java进行XML解析的最佳库。
Java的PriorityQueue的内置迭代器不会以任何特定顺序遍历数据结构。为什么?
如何在Java中聆听按键时移动图像。
Java“Program to an interface”。这是什么意思?
Java在半透明框架/面板/组件上重新绘画。
Java“ Class.forName()”和“ Class.forName()。newInstance()”之间有什么区别?
在此环境中不提供编译器。也许是在JRE而不是JDK上运行?
Java用相同的方法在一个类中实现两个接口。哪种接口方法被覆盖?
Java 什么是Runtime.getRuntime()。totalMemory()和freeMemory()?
java.library.path中的java.lang.UnsatisfiedLinkError否*****。dll
JavaFX“位置是必需的。” 即使在同一包装中
Java 导入两个具有相同名称的类。怎么处理?
Java 是否应该在HttpServletResponse.getOutputStream()/。getWriter()上调用.close()?
Java RegEx元字符(。)和普通点?