微信公众号搜"智元新知"关注
微信扫一扫可直接关注哦!

将流处理为 IAsyncEnumerable - 流不可读

如何解决将流处理为 IAsyncEnumerable - 流不可读

我有一个工作流程,我尝试执行以下操作:

  • 一个接受回调的方法,它在内部产生一个 Stream,该方法调用者可以使用回调以他们想要的任何方式处理 Stream
  • 在一种特殊情况下,调用者使用回调从流中生成 IAsyncEnumerable

我在下面创建了一个最小的复制示例:

class Program
{
    private static async Task<Stream> GetStream()
    {
        var text =
            @"Multi-line
            string";

        await Task.Yield();

        var bytes = Encoding.UTF8.GetBytes(text);
        return new MemoryStream(bytes);
    }

    private static async Task<T> StreamData<T>(Func<Stream,T> streamAction)
    {
        await using var stream = await GetStream();
        return streamAction(stream);
    }

    private static async Task StreamData(Func<Stream,Task> streamAction)
    {
        await using var stream = await GetStream();
        await streamAction(stream);
    }

    private static async IAsyncEnumerable<string> GetTextLinesFromStream(Stream stream)
    {
        using var reader = new StreamReader(stream);

        var line = await reader.ReadLineAsync();
        while (line != null)
        {
            yield return line;
            line = await reader.ReadLineAsync();
        }
    }

    private static async Task Test1()
    {
        async Task GetRecords(Stream str)
        {
            await foreach(var line in GetTextLinesFromStream(str))
                Console.WriteLine(line);
        }

        await StreamData(GetRecords);
    }

    private static async Task Test2()
    {
        await foreach(var line in await StreamData(GetTextLinesFromStream))
            Console.WriteLine(line);
    }

    static async Task Main(string[] args)
    {
        await Test1();
        await Test2();
    }
}  

在这里方法 Test1 工作正常,而 Test2 没有,失败了 Stream is not readable。问题在于,在第二种情况下,当代码开始处理实际流时,流已经被释放了。

大概这两个例子之间的区别在于,对于第一个例子,读取流是在仍处于一次性 stream 的上下文中时执行的,而在第二个例子中,我们已经出局了。

但是,我认为第二种情况也可能有效 - 至少我觉得它非常符合 C# 习惯。我还缺少什么让第二个案例也能正常工作吗?

解决方法

Test2 方法的问题在于 Stream 在创建 IAsyncEnumerable<string> 时被释放,而不是在其枚举完成时释放。

Test2 方法使用第一个 StreamData 重载,即返回 Task<T> 的重载。在这种情况下,T 是一个 IAsyncEnumerable<string>。因此,StreamData 方法返回一个生成异步序列的任务,然后立即处理流(在生成序列之后)。显然,这不是处理流的合适时机。正确的时刻应该是在 await foreach 循环完成之后。

为了使 Test2 透明地工作,您应该添加 StreamData 方法的第三个重载,该方法返回 Task<IAsyncEnumerable<T>>(而不是 Task 或 {{1 }})。此重载应返回一个与一次性资源相关联的专用异步序列,并在其枚举完成后处理此资源。下面是这样一个序列的实现:

Task<T>

您可以像这样在 public class AsyncEnumerableDisposable<T> : IAsyncEnumerable<T> { private readonly IAsyncEnumerable<T> _source; private readonly IAsyncDisposable _disposable; public AsyncEnumerableDisposable(IAsyncEnumerable<T> source,IAsyncDisposable disposable) { // Arguments validation omitted _source = source; _disposable = disposable; } async IAsyncEnumerator<T> IAsyncEnumerable<T>.GetAsyncEnumerator( CancellationToken cancellationToken) { await using (_disposable.ConfigureAwait(false)) await foreach (var item in _source .WithCancellation(cancellationToken) .ConfigureAwait(false)) yield return item; } } 方法中使用它:

StreamData

请记住,通常一个 private static async Task<IAsyncEnumerable<T>> StreamData<T>( Func<Stream,IAsyncEnumerable<T>> streamAction) { var stream = await GetStream(); return new AsyncEnumerableDisposable<T>(streamAction(stream),stream); } 在其生命周期内可以被枚举多次,通过将它包装到一个 IAsyncEnumerable<T> 中,它本质上被简化为一个单一的枚举序列(因为资源将在第一次枚举后处理)。


替代方案: System.Interactive.Async 包包含 AsyncEnumerableEx.Using 运算符,可用于代替自定义 AsyncEnumerableDisposable<T> 类:

AsyncEnumerableDisposable

不同之处在于 private static async Task<IAsyncEnumerable<T>> StreamData<T>( Func<Stream,IAsyncEnumerable<T>> streamAction) { var stream = await GetStream(); return AsyncEnumerableEx.Using(() => stream,streamAction); } 将通过其 Stream 方法同步处理。 AFAICS 不支持在此包中处理 Dispose

这是 IAsyncDisposable 方法的签名:

AsyncEnumerableEx.Using

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。

相关推荐


Selenium Web驱动程序和Java。元素在(x,y)点处不可单击。其他元素将获得点击?
Python-如何使用点“。” 访问字典成员?
Java 字符串是不可变的。到底是什么意思?
Java中的“ final”关键字如何工作?(我仍然可以修改对象。)
“loop:”在Java代码中。这是什么,为什么要编译?
java.lang.ClassNotFoundException:sun.jdbc.odbc.JdbcOdbcDriver发生异常。为什么?
这是用Java进行XML解析的最佳库。
Java的PriorityQueue的内置迭代器不会以任何特定顺序遍历数据结构。为什么?
如何在Java中聆听按键时移动图像。
Java“Program to an interface”。这是什么意思?