微信公众号搜"智元新知"关注
微信扫一扫可直接关注哦!

如何使用2个数据源返回异步流

如何解决如何使用2个数据源返回异步流

我具有以下函数,该函数以运行System.Diagnostics.Process的结果作为异步流返回标准输出数据。该方法中当前的所有操作均按预期进行。我可以在await foreach()循环中调用它,并获得由外部exe生成的每一行输出

private static async IAsyncEnumerable<string> ProcessAsyncStream (
    processstartinfo processstartinfo)
{
   // Ensure that process is destroyed when this method exits
   using var process = new Process() { StartInfo = processstartinfo };

   // Buffer used to pass data from event-handler back to this method
   BufferBlock<string> dataBuffer = new BufferBlock<string>();

   process.OutputDataReceived += (s,e) =>
   {
      if (e.Data is null)
      {
         dataBuffer.Complete();
      }
      else
      {
         dataBuffer.Post(e.Data);
      }
   };

   // Start process and redirect output streams
   process.Start();
   process.BeginoutputReadLine();

   // Return data line by line  
   while (await dataBuffer.OutputAvailableAsync())
      yield return dataBuffer.Receive();
}

我的问题是,现在我需要它同时返回标准输出和标准错误结果。我做了这个类来保存每个流中的数据。

public class ProcessData
{
   public string Error { get; set; } = "";
   public string Output { get; set; } = "";
}

并将ProcessAsyncStream()更改为如下形式

private static async IAsyncEnumerable<ProcessData> ProcessAsyncStream (
    processstartinfo processstartinfo)
{
   // Ensure that process is destroyed when this method exits
   using var process = new Process() { StartInfo = processstartinfo };

   // Buffer used to pass data from event-handlers back to this method
   BufferBlock<string> outputDataBuffer = new BufferBlock<string>();
   BufferBlock<string> errorDataBuffer = new BufferBlock<string>();

   
   process.OutputDataReceived += (s,e) =>
   {
      if (e.Data is null)
      {
         outputDataBuffer.Complete();
      }
      else
      {
         outputDataBuffer.Post(e.Data);
      }
   };

   process.ErrorDataReceived += (s,e) =>
   {
      if (e.Data is null)
      {
         errorDataBuffer.Complete();
      }
      else
      {
         errorDataBuffer.Post(e.Data);
      }
   };

   // Start process and redirect output streams
   process.Start();
   process.BeginoutputReadLine();

   // Return data line by line
   while (await outputDataBuffer.OutputAvailableAsync()
          || await errorDataBuffer.OutputAvailableAsync())
      yield return new ProcessData() 
      {
         Error = errorDataBuffer.Receive(),Output = outputDataBuffer.Receive()
      }
}

问题在于,如果一个缓冲区在方法挂起之前就完成了,因为该缓冲区的.Receive()没有任何数据要接收。如果将while条件更改为&&,则不会从另一个缓冲区获取所有数据。

有什么建议吗?

解决方法

关于实际问题,读取块的过程流程存在问题。最简单的解决方案是只使用带有多个生产者的单个缓冲区和单个消费者以及消息包

您尝试使用 DataFlow块解决的概念性问题是事件异步流的基本本质。推送事件,并提取异步流

有几种解决方案可以将它们映射在一起,尽管我认为最优雅的方法是使用 Unbounded Channel 作为 buffer

与DataFlow相比,渠道是更现代的方法,与BufferBlock相比,其自由度更低,更笨拙,并且非常轻巧且经过高度优化。另外,我只需要为不同的响应类型传递包装器

忽略任何其他问题(概念性或其他问题)。

给予

public enum MessageType
{
   Output,Error
}

public class Message
{
   public MessageType MessageType { get; set; }
   public string Data { get; set; }

   public Message(string data,MessageType messageType )
   {
      Data = data;
      MessageType = messageType;
   }
}

用法

private async IAsyncEnumerable<Message> ProcessAsyncStreamAsync(
     ProcessStartInfo processStartInfo,CancellationToken cancellationToken)
{
   using var process = new Process() { StartInfo = processStartInfo };

   var ch = Channel.CreateUnbounded<Message>();
   var completeCount = 0;

   void OnReceived(string data,MessageType type)
   {
      // The Interlocked memory barrier is likely overkill here
      if (data is null && Interlocked.Increment(ref completeCount) == 2)
         ch?.Writer.Complete();
      else
         ch?.Writer.WriteAsync(new Message(data,type),cancellationToken);
   }

   process.OutputDataReceived += (_,args) => OnReceived(args.Data,MessageType.Output);
   process.ErrorDataReceived += (_,MessageType.Error);

   // start the process 
   // ...

   await foreach (var message in ch.Reader
           .ReadAllAsync(cancellationToken)
           .ConfigureAwait(false))
      yield return message;

   // cleanup
   // ...
}

注意:完全未经测试

,

改为在退出时完成。

void HandleData(object sender,DataReceivedEventArgs e)
{
    if (e.Data != null) dataBuffer.Post(e.Data);
}

process.OutputDataReceived += HandleData;
process.ErrorDataReceived += HandleData;
process.Exited += (s,e) => 
{
    process.WaitForExit();
    dataBuffer.Complete();
};
,

您可以使用ProcessData个项目的单个缓冲区:

var buffer = new BufferBlock<ProcessData>();

然后,当两个事件都传播了Complete值时,使用自定义null机制来完成缓冲区:

process.OutputDataReceived += (s,e) =>
{
    if (e.Data is null) Complete(1);
        else buffer.Post(new ProcessData() { Output = e.Data });
};

process.ErrorDataReceived += (s,e) =>
{
    if (e.Data is null) Complete(2);
        else buffer.Post(new ProcessData() { Error = e.Data });
};

这是Complete方法的实现:

bool[] completeState = new bool[2];
void Complete(int index)
{
    bool completed;
    lock (completeState.SyncRoot)
    {
        completeState[index - 1] = true;
        completed = completeState.All(v => v);
    }
    if (completed) buffer.Complete();
}

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。

相关推荐


Selenium Web驱动程序和Java。元素在(x,y)点处不可单击。其他元素将获得点击?
Python-如何使用点“。” 访问字典成员?
Java 字符串是不可变的。到底是什么意思?
Java中的“ final”关键字如何工作?(我仍然可以修改对象。)
“loop:”在Java代码中。这是什么,为什么要编译?
java.lang.ClassNotFoundException:sun.jdbc.odbc.JdbcOdbcDriver发生异常。为什么?
这是用Java进行XML解析的最佳库。
Java的PriorityQueue的内置迭代器不会以任何特定顺序遍历数据结构。为什么?
如何在Java中聆听按键时移动图像。
Java“Program to an interface”。这是什么意思?