如何解决无法将项目排队到抽象类中使用的BlockingCollection中
正如标题所示,我利用抽象类来创建可重用的基类,以使用BlockingCollection添加和删除项目。我使用生产者/使用者模式使项目入队和出队,抽象类是通用的,因此我可以为队列指定一个对象。
当我调用基类的Enqueue和Dequeue方法时,出现了我的问题,它们似乎引用了队列的另一个实例,即,当我调用dequeue时,队列中没有任何项目,并且队列TryAdd()返回true。但是,当我直接在队列实例上调用Add或Take时,效果很好。
谁能告诉我为什么基类的Enqueue和Dequeue方法不能按我的预期工作,我试图查找BlockingCollections的几种用法,却看不到为什么。
Program.cs
static void Main(string[] args)
{
var processor = new Processor(1,4);
Console.ReadKey();
}
抽象基类
public abstract class BaseProcessor<T>: Idisposable
{
protected BlockingCollection<T> _queue;
private CancellationTokenSource _tokenSource;
private int _producers;
private int _consumers;
private List<Task> _tasks;
public BaseProcessor(int producers,int consumers)
{
_queue = new BlockingCollection<T>();
_tokenSource = new CancellationTokenSource();
_producers = producers;
_consumers = consumers;
_tasks = new List<Task>();
}
protected void Setup()
{
Parallel.For(0,_producers,i =>
_tasks.Add(Task.Factory.StartNew(() => Produce(_tokenSource.Token),_tokenSource.Token)
.ContinueWith((task) =>
{
Console.WriteLine("Task {0} stopped",task.Id);
}))
);
Parallel.For(0,i =>
_tasks.Add(Task.Factory.StartNew(() => Consume(_tokenSource.Token),task.Id);
}))
);
}
protected abstract void Produce(CancellationToken token);
protected abstract void Consume(CancellationToken token);
protected void Enqueue(T item)
{
try
{
var res = _queue.TryAdd(item,TimeSpan.FromSeconds(1));
}
catch (Exception ex)
{
Console.WriteLine("Could not add item to queue: {0}",ex);
}
}
protected void Enqueue(List<T> items)
{
items.ForEach(o => Enqueue(o));
}
protected T Dequeue()
{
try
{
T item;
while (_queue.TryTake(out item,TimeSpan.FromSeconds(1))) ;
return item;
}
catch (Exception ex)
{
Console.WriteLine("Could not remove item to queue: {0}",ex);
return default(T);
}
}
public void dispose()
{
// Cancel all tokens
_tokenSource.Cancel();
// Wait for all tasks complete
Task.WaitAll(_tasks.ToArray());
_queue.dispose();
}
}
抽象类的实现
public class Processor: BaseProcessor<QueueItem>
{
private int _counter;
public Processor(int producers,int consumers): base(producers,consumers)
{
_counter = 0;
Setup();
}
protected override void Produce(CancellationToken token)
{
while (!token.IsCancellationRequested)
{
var queueItem = new QueueItem()
{
Id = _counter,Timestamp = DateTime.Now
};
//Enqueue(queueItem);
_queue.Add(queueItem);
Console.WriteLine("Enqueued: {0}",_counter);
_counter++;
Thread.Sleep(3000);
}
}
protected override void Consume(CancellationToken token)
{
while (!token.IsCancellationRequested)
{
var item = Dequeue();
//var item = _queue.Take();
Console.WriteLine("Dequeued: {0}",item.Id);
}
}
}
解决方法
我发现了原因,这是我使用此链接获取的一些代码的实现。 https://www.infoworld.com/article/3090215/how-to-work-with-blockingcollection-in-c.html
while(_queue.TryTake(out item,TimeSpan.FromSeconds(1)));正在从队列中转储所有项目,它一直运行到队列实际上为空。从本教程中,我认为它正在运行,并且有一些值得尝试的地方。
代表我盲目复制和粘贴问题
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。