微信公众号搜"智元新知"关注
微信扫一扫可直接关注哦!

无法将项目排队到抽象类中使用的BlockingCollection中

如何解决无法将项目排队到抽象类中使用的BlockingCollection中

正如标题所示,我利用抽象类来创建可重用的基类,以使用BlockingCollection添加删除项目。我使用生产者/使用者模式使项目入队和出队,抽象类是通用的,因此我可以为队列指定一个对象。

当我调用基类的Enqueue和Dequeue方法时,出现了我的问题,它们似乎引用了队列的另一个实例,即,当我调用dequeue时,队列中没有任何项目,并且队列TryAdd()返回true。但是,当我直接在队列实例上调用Add或Take时,效果很好。

谁能告诉我为什么基类的Enqueue和Dequeue方法不能按我的预期工作,我试图查找BlockingCollections的几种用法,却看不到为什么。

Program.cs

static void Main(string[] args)
{

    var processor = new Processor(1,4);

    Console.ReadKey();
}

抽象基类

public abstract class BaseProcessor<T>: Idisposable
{
    protected BlockingCollection<T> _queue;
    private CancellationTokenSource _tokenSource;
    private int _producers;
    private int _consumers;
    private List<Task> _tasks;

    public BaseProcessor(int producers,int consumers)
    {
        _queue = new BlockingCollection<T>();
        _tokenSource = new CancellationTokenSource();
        _producers = producers;
        _consumers = consumers;
        _tasks = new List<Task>();
    }

    protected void Setup()
    {
        Parallel.For(0,_producers,i =>
            _tasks.Add(Task.Factory.StartNew(() => Produce(_tokenSource.Token),_tokenSource.Token)
            .ContinueWith((task) =>
            {
                Console.WriteLine("Task {0} stopped",task.Id);
            }))
         );

        Parallel.For(0,i =>
            _tasks.Add(Task.Factory.StartNew(() => Consume(_tokenSource.Token),task.Id);
            }))
         );
    }

    protected abstract void Produce(CancellationToken token);
    protected abstract void Consume(CancellationToken token);

    protected void Enqueue(T item)
    {
        try
        {
            var res = _queue.TryAdd(item,TimeSpan.FromSeconds(1));
        }
        catch (Exception ex)
        {
            Console.WriteLine("Could not add item to queue: {0}",ex);
        }
    }

    protected void Enqueue(List<T> items)
    {
        items.ForEach(o => Enqueue(o));
    }

    protected T Dequeue()
    {
        try
        {
            T item;
            while (_queue.TryTake(out item,TimeSpan.FromSeconds(1))) ;
            return item;
        }
        catch (Exception ex)
        {
            Console.WriteLine("Could not remove item to queue: {0}",ex);
            return default(T);
        }
    }


    public void dispose()
    {
        // Cancel all tokens
        _tokenSource.Cancel();

        // Wait for all tasks complete
        Task.WaitAll(_tasks.ToArray());

        _queue.dispose();
    }
}

抽象类的实现

public class Processor: BaseProcessor<QueueItem>
{

    private int _counter;

    public Processor(int producers,int consumers): base(producers,consumers)
    {
        _counter = 0;

        Setup();
    }

    protected override void Produce(CancellationToken token)
    {
        while (!token.IsCancellationRequested)
        {

            var queueItem = new QueueItem()
            {
                Id = _counter,Timestamp = DateTime.Now
            };

            //Enqueue(queueItem);
            _queue.Add(queueItem);

            Console.WriteLine("Enqueued: {0}",_counter);
            _counter++;

            Thread.Sleep(3000);
        }
    }

    protected override void Consume(CancellationToken token)
    {
        while (!token.IsCancellationRequested)
        {
            var item = Dequeue();
            //var item = _queue.Take();
            Console.WriteLine("Dequeued: {0}",item.Id);                
        }
    }
}

解决方法

我发现了原因,这是我使用此链接获取的一些代码的实现。 https://www.infoworld.com/article/3090215/how-to-work-with-blockingcollection-in-c.html

while(_queue.TryTake(out item,TimeSpan.FromSeconds(1)));正在从队列中转储所有项目,它一直运行到队列实际上为空。从本教程中,我认为它正在运行,并且有一些值得尝试的地方。

代表我盲目复制和粘贴问题

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。