微信公众号搜"智元新知"关注
微信扫一扫可直接关注哦!

c# – 如何聚合来自异步生成器的数据并将其写入文件?

我正在学习C#中的异步/等待模式.目前我正在尝试解决这样的问题:

>有一个生产者(硬件设备)每秒生成1000个数据包.我需要将此数据记录到文件中.
>设备只有一个ReadAsync()方法一次报告一个数据包.
>我需要缓冲数据包并按照它们生成的顺序将它们写入文件,每秒只执行一次.
>如果写入过程没有在下一批数据包准备好写入时及时完成,则写操作应该失败.

到目前为止,我写了类似下面的内容.它有效,但我不确定这是否是解决问题的最佳方法.有任何意见或建议吗?在消费者需要汇总从生产者处收到的数据时,采用这种生产者/消费者问题的最佳做法是什么?

static async Task TestLogger(Device device,int seconds)
{
    const int bufLength = 1000;
    bool firstIteration = true;
    Task writerTask = null;

    using (var writer = new StreamWriter("test.log")))
    {
        do
        {
            var buffer = new byte[bufLength][];

            for (int i = 0; i < bufLength; i++)
            {
                buffer[i] = await device.ReadAsync();
            }

            if (!firstIteration)
            {
                if (!writerTask.IsCompleted)
                    throw new Exception("Write Time Out!");
            }

            writerTask = Task.Run(() =>
                {
                    foreach (var b in buffer)
                        writer.WriteLine(ToHexString(b));
                });

            firstIteration = false;
        } while (--seconds > 0);
    }
}

解决方法

您可以使用以下想法,前提是flush的条件是数据包的数量(最多1000个).我没有测试它.它利用了 this question中Stephen Cleary的 AsyncProducerConsumerQueue<T>特色.
AsyncProducerConsumerQueue<byte[]> _queue;
Stream _stream;

// producer
async Task ReceiveAsync(CancellationToken token)
{
    while (true)
    {
       var list = new List<byte>();
       while (true)
       {
           token.ThrowIfCancellationRequested(token);
           var packet = await _device.ReadAsync(token);
           list.Add(packet);
           if (list.Count == 1000)
               break;
       }
       // push next batch
       await _queue.EnqueueAsync(list.ToArray(),token);
    }
}

// consumer
async Task LogAsync(CancellationToken token)
{
    Task prevIoUsFlush = Task.Fromresult(0); 
    CancellationTokenSource cts = null;
    while (true)
    {
       token.ThrowIfCancellationRequested(token);
       // get next batch
       var nextBatch = await _queue.DequeueAsync(token);
       if (!prevIoUsFlush.IsCompleted)
       {
           cts.Cancel(); // cancel the prevIoUs flush if not ready
           throw new Exception("Failed to flush on time.");
       }
       await prevIoUsFlush; // it's completed,observe for any errors
       // start flushing
       cts = CancellationTokenSource.CreateLinkedTokenSource(token);
       prevIoUsFlush = _stream.WriteAsync(nextBatch,nextBatch.Count,cts.Token);
    }
}

如果您不想让记录器失败,而是希望取消刷新并继续下一批,则可以对此代码进行最小的更改.

回应@ l3arnon评论

  1. A packet is not a byte,it’s byte[]. 2. You haven’t used the OP’s ToHexString. 3. AsyncProducerConsumerQueue is much less robust and
    tested than .Net’s TPL Dataflow. 4. You await prevIoUsFlush for errors
    just after you throw an exception which makes that line redundant.
    etc. In short: I think the possible added value doesn’t justify this
    very complicated solution.

>“数据包不是字节,它是字节[]” – 数据包是一个字节,这从OP的代码中很明显:buffer [i] = await device.ReadAsync().然后,一批数据包是byte [].
>“你还没有使用OP的ToHexString.” – 目标是展示如何使用Stream.WriteAsync本身接受取消令牌,而不是WriteLineAsync,它不允许取消.将ToHexString与Stream.WriteAsync一起使用并仍然利用取消支持是微不足道的:

var hexBytes = Encoding.ASCII.GetBytes(ToHexString(nextBatch) + 
    Environment.NewLine);
_stream.WriteAsync(hexBytes,hexBytes.Length,token);

>“AsyncProducerConsumerQueue的稳健性和测试性都不如.Net的TPL数据流” – 我不认为这是一个确定的事实.但是,如果OP关心它,他可以使用常规的BlockingCollection,它不会阻塞生产者线程.在等待下一批时阻止使用者线程是可以的,因为写入是并行完成的.与此相反,您的TPL Dataflow版本带有一个冗余cpu和锁密集操作:使用logAction.Post(数据包)将数据从生产者管道移动到写入器pipleline,逐字节.我的代码不这样做.>“在抛出导致该行冗余的异常之后,您等待prevIoUsFlush出错.” – 这条线不是多余的.也许,你错过了这一点:当prevIoUsFlush.IsFaulted或prevIoUsFlush.IsCancelled也为true时,prevIoUsFlush.IsCompleted可能为true.因此,等待prevIoUsFlush与之相关,以观察已完成任务上的任何错误(例如,写入失败),否则将丢失.

原文地址:https://www.jb51.cc/csharp/99471.html

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。

相关推荐