微信公众号搜"智元新知"关注
微信扫一扫可直接关注哦!

发送到 IAsyncEnumerable<bytes> 的 `Stream` 的实现

如何解决发送到 IAsyncEnumerable<bytes> 的 `Stream` 的实现

在我的 .Net Core 应用程序中,第三方库中有一个方法可以写入 forumla1 <- as.formula(Sepal.Length ~ Petal.Length) compute_cooks_models <- function(x,eq){ cooks.distance(lm(eq,data = x$train,na.action = na.exclude)) } result <- sapply(datasets,compute_cooks_models,eq=forumla1) 接口(它将流接口作为参数并写入它),但我希望该数据转到我的数据源期望数据为 System.IO.Stream 流。我开始编写实现 IAsyncEnumerable<bytes> 接口的代码,因此当调用 Stream 时它会写入 Write(),然后认为“这之前一定已经完成了” - 似乎是通用的目的使用。

那么在 3rd 方库中是否有标准实现,或者我缺少任何“巧妙的技巧”?

解决方法

这是一个自定义的 Stream 实现,用于异步生产者-消费者场景。它是一个可写流,只能通过特殊的 GetConsumingEnumerable 方法读取(消耗)它。

public class ProducerConsumerStream : Stream
{
    private readonly Channel<byte> _channel;

    public ProducerConsumerStream(bool singleReader = true,bool singleWriter = true)
    {
        _channel = Channel.CreateUnbounded<byte>(new UnboundedChannelOptions()
        {
            SingleReader = singleReader,SingleWriter = singleWriter
        });
    }

    public override bool CanRead { get { return false; } }
    public override bool CanSeek { get { return false; } }
    public override bool CanWrite { get { return true; } }
    public override long Length { get { throw new NotSupportedException(); } }
    public override void Flush() { }

    public override long Position
    {
        get { throw new NotSupportedException(); }
        set { throw new NotSupportedException(); }
    }

    public override long Seek(long offset,SeekOrigin origin)
        => throw new NotSupportedException();

    public override void SetLength(long value)
        => throw new NotSupportedException();

    public override int Read(byte[] buffer,int offset,int count)
        => throw new NotSupportedException();

    public override void Write(byte[] buffer,int count)
    {
        if (buffer == null) throw new ArgumentNullException(nameof(buffer));
        if (offset < 0) throw new ArgumentOutOfRangeException(nameof(offset));
        if (count < 0) throw new ArgumentOutOfRangeException(nameof(count));
        if (offset + count > buffer.Length)
            throw new ArgumentOutOfRangeException(nameof(count));

        for (int i = offset; i < offset + count; i++)
            _channel.Writer.TryWrite(buffer[i]);
    }

    public override void WriteByte(byte value)
    {
        _channel.Writer.TryWrite(value);
    }

    public override void Close()
    {
        base.Close();
        _channel.Writer.Complete();
    }

    public IAsyncEnumerable<byte> GetConsumingEnumerable(
        CancellationToken cancellationToken = default)
    {
        return _channel.Reader.ReadAllAsync(cancellationToken);
    }
}

此实现基于 Channel<byte>。如果您不熟悉频道,这里有教程 here

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。

相关推荐


Selenium Web驱动程序和Java。元素在(x,y)点处不可单击。其他元素将获得点击?
Python-如何使用点“。” 访问字典成员?
Java 字符串是不可变的。到底是什么意思?
Java中的“ final”关键字如何工作?(我仍然可以修改对象。)
“loop:”在Java代码中。这是什么,为什么要编译?
java.lang.ClassNotFoundException:sun.jdbc.odbc.JdbcOdbcDriver发生异常。为什么?
这是用Java进行XML解析的最佳库。
Java的PriorityQueue的内置迭代器不会以任何特定顺序遍历数据结构。为什么?
如何在Java中聆听按键时移动图像。
Java“Program to an interface”。这是什么意思?