c# – 具有固定大小FIFO队列的生产者/消费者模式

我需要在固定大小的FIFO队列周围实现生产者/消费者模式.我认为围绕ConcurrentQueue的包装类可能适用于此,但我不完全确定(我之前从未使用过ConcurrentQueue).这种扭曲是队列只需要保存固定数量的项目(在我的例子中是字符串).我的应用程序将有一个生产者任务/线程和一个消费者任务/线程.当我的消费者任务运行时,它需要在那个时刻出现队列中存在的所有项目并对其进行处理.

对于它的价值,我的消费者处理排队的项目只不过是通过SOAP将它们上传一个不是100%可靠的网络应用程序.如果无法建立连接或调用SOAP调用失败,我应该丢弃这些项目并返回队列以获取更多信息.由于SOAP的开销,我试图最大化队列中可以在一次SOAP调用中发送的项目数.

有时,我的制作人可能会比我的消费者能够删除和处理它们更快地添加项目.如果队列已满并且我的生产者需要添加一个项目,我需要将新项目排队,然后将最旧的项目出列,以便队列的大小保持固定.基本上,我需要始终保留队列中生成的最新项目(即使这意味着某些项目不会被消耗,因为我的消费者当前正在处理以前的项目).

关于生产者如果队列中的项目是固定的那样保留数字,我从这个问题中发现了一个潜在的想法:

Fixed size queue which automatically dequeues old values upon new enques

我目前在ConcurrentQueue周围使用一个包装类(基于该答案)和Enqueue()方法,如下所示:

public class FixedSizeQueue<T>
{
    readonly ConcurrentQueue<T> queue = new ConcurrentQueue<T>();

    public int Size { get; private set; }

    public FixedSizeQueue(int size)
    {
        Size = size;
    }

    public void Enqueue(T obj)
    {
        // add item to the queue
        queue.Enqueue(obj);

        lock (this) // lock queue so that queue.Count is reliable
        {
            while (queue.Count > Size) // if queue count > max queue size,then dequeue an item
            {
                T objOut;
                queue.TryDequeue(out objOut);
            }
        }
    }
}

我创建了这个类的实例,对队列有一个大小限制,如下所示:

FixedSizeQueue<string> incomingMessageQueue = new FixedSizeQueue<string>(10); // 10 item limit

我启动了我的生产者任务,它开始填充队列.我的Enqueue()方法中的代码似乎在添加项目时从队列中删除最旧的项目时正常工作导致队列计数超过最大大小.现在我需要我的消费者任务来将项目出列并处理它们,但这就是我的大脑混淆的地方.为我的消费者实现Dequeue方法的最佳方法是什么,它将在某个时刻获取队列的快照并将所有项目出列以进行处理(生产者可能仍然在此过程中将项目添加到队列中)?

解决方法

简单地说,ConcurrentQueue有一个“ToArray”方法,当输入时,它将锁定集合并生成队列中所有当前项的“快照”.如果您希望为您的消费者提供一系列工作,您可以锁定入队方法所具有的相同对象,调用ToArray(),然后旋转一段时间(!queue.IsEmpty)queue.TryDequeue(out trash )循环以清除队列,然后返回您提取的数组.

这将是你的GetAll()方法

public T[] GetAll()
{
    lock (syncObj) // so that we don't clear items we didn't get with ToArray()
    {
        var result = queue.ToArray();
        T trash;
        while(!queue.IsEmpty) queue.TryDequeue(out trash);
    }
}

由于你必须清除队列,你可以简单地组合这两个操作;创建一个适当大小的数组(使用queue.Count),然后在队列不为空时,在返回之前将一个项目出列并将其放入数组中.

现在,这是具体问题的答案.我现在必须在良心上穿上我的CodeReview.SE帽子并指出一些事情:

>永远不要使用锁(这个).您永远不会知道其他对象可能将您的对象用作锁定焦点,因此当对象从内部锁定自身时会被阻止.最佳做法是锁定一个私有范围的对象实例,通常只创建一个被锁定的对象:private readonly object syncObj = new object();
>因为无论如何你都要锁定包装器的关键部分,我会使用普通的List< T>而不是并发集合.访问速度更快,更容易清理,因此您可以比ConcurrentQueue允许的更简单地完成您所做的工作.要排队,请在索引零之前锁定同步对象Insert(),然后使用RemoveRange()从索引大小中删除任何项目到列表的当前计数.要出列,请锁定同一个同步对象,调用myList.ToArray()(来自Linq命名空间;与ConcurrentQueue完全相同),然后在返回数组之前调用myList.Clear().不能简单:

public class FixedSizeQueue<T>
{
private readonly List<T> queue = new List<T>();
private readonly object syncObj = new object();

public int Size { get; private set; }

public FixedSizeQueue(int size) { Size = size; }

public void Enqueue(T obj)
{
    lock (syncObj)
    {
        queue.Insert(0,obj)
        if(queue.Count > Size) 
           queue.RemoveRange(Size,Count-Size);
    }
}

public T[] Dequeue()
{
    lock (syncObj)
    {
        var result = queue.ToArray();
        queue.Clear();
        return result;
    }
}
}

>您似乎明白使用此模型将您排队的物品扔掉了.这通常不是一件好事,但我愿意给你怀疑的好处.但是,我会说使用BlockingCollection有一种无损的方法来实现这一点. BlockingCollection包装任何IProducerConsumerCollection,包括大多数System.Collections.Concurrent类,并允许您指定队列的最大容量.然后,该集合将阻止任何尝试从空队列或任何尝试添加到完整队列的线程出队的线程,直到已添加删除项目,以便有可能获得某些内容或插入空间.这是实现具有最大大小的生产者 – 消费者队列的最佳方式,或者是否需要“轮询”以查看是否有供消费者使用的内容的最佳方式.如果你走这条路,只有消费者扔掉的东西才会被扔掉;消费者将看到生产者投入的所有行,并对每个行做出自己的决定.

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。

相关推荐


原文地址:http://msdn.microsoft.com/en-us/magazine/cc163791.aspx 原文发布日期: 9/19/2005 原文已经被 Microsoft 删除了,收集过程中发现很多文章图都不全,那是因为原文的图都不全,所以特收集完整全文。 目录 前言 CLR启动程序
前言 随着近些年微服务的流行,有越来越多的开发者和团队所采纳和使用,它的确提供了很多的优势也解决了很多的问题,但是我们也知道也并不是银弹,提供优势的同时它也给我们的开发人员和团队也带来了很多的挑战。 为了迎接或者采用这些新技术,开发团队需要更加注重一些流程或工具的使用,这样才能更好的适应这些新技术所
最近因为比较忙,好久没有写博客了,这篇主要给大家分享一下PLINQ中的分区。上一篇介绍了并行编程,这边详细介绍一下并行编程中的分区和自定义分区。 先做个假设,假设我们有一个200Mb的文本文件需要读取,怎么样才能做到最优的速度呢?对,很显然就是拆分,把文本文件拆分成很多个小文件,充分利用我们计算机中
在多核CPU在今天和不久的将来,计算机将拥有更多的内核,Microsoft为了利用这个硬件特性,于是在Visual Studio 2010 和 .NET Framework 4的发布及以上版本中,添加了并行编程这个新特性,我想它以后势必会改变我们的开发方式。 在以前或者说现在,我们在并行开发的时候可
c语言输入成绩怎么判断等级
字符型数据在内存中的存储形式是什么
c语言怎么求字符串的长度并输出
c语言函数的三种调用方式是什么
c语言中保留两位小数怎么表示
double的输入格式符是什么
长整型输出格式是什么
C语言中文件包含的命令关键字是什么
c程序如何编写x的y次方
c语言开根号代码是什么
c语言怎么进行字符串比较
c语言怎么进行强制类型转换
c语言运算符的优先级顺序是什么
c++用什么软件编程
中序遍历是怎么遍历的
h文件和c文件的关系是什么