微信公众号搜"智元新知"关注
微信扫一扫可直接关注哦!

如果X分钟内没有新项目进入Channel的方式,如何读取Channel中剩余数量小于批量的项目

如何解决如果X分钟内没有新项目进入Channel的方式,如何读取Channel中剩余数量小于批量的项目

我正在使用Channel中的system.threading.channels,并且想批量读取项目(5个项目),并且我有如下方法

public class Batcher
{
    private readonly Channel<Measurementviewmodel> _channel;
    public Batcher()
    {
        _channel = Channel.CreateUnbounded<Measurementviewmodel>();
    }
    public async Task<Measurementviewmodel[]> ReadBatchAsync(int batchSize,CancellationToken stoppingToken)
    {
        var result = new Measurementviewmodel[batchSize];

        for (var i = 0; i < batchSize; i++)
        {
            result[i] = await _channel.Reader.ReadAsync(stoppingToken);
        }

        return result;
    }
}

在asp.net核心后台服务中,我像下面这样使用它,

public class WriterService : BackgroundService
{
    private readonly Batcher _batcher;
    public WriterService(Batcher batcher)
    {
        _batcher = batcher;
    }

    protected override async Task ExecuteAsync(CancellationToken stoppingToken)
    {
        while (!stoppingToken.IsCancellationRequested)
        {
            var batchOfItems = await _batcher.ReadBatchAsync(5,stoppingToken);

            var range = string.Join(',',batchOfItems.Select(item => item.Value));

            var x = range;
        }
    }
}

这是可行的,只要Channel中有5个项目,我就会得到range

问题是,当Channel中仅剩2个项目,并且自最近10分钟以来没有项目进入Channel时,如何读取Channel中其余2个项目? / p>

解决方法

您可以创建一个linked CancellationTokenSource,以便同时监视外部取消请求和内部引起的超时。下面是通过为ChannelReader类创建ReadBatchAsync扩展方法来使用此技术的示例:

public static async ValueTask<T[]> ReadBatchAsync<T>(
    this ChannelReader<T> channelReader,int batchSize,TimeSpan timeout,CancellationToken cancellationToken = default)
{
    // Arguments validation omitted
    var items = new List<T>(batchSize);
    using (var linkedCTS
        = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken))
    {
        linkedCTS.CancelAfter(timeout);
        while (true)
        {
            var token = items.Count == 0 ? cancellationToken : linkedCTS.Token;
            T item;
            try
            {
                item = await channelReader.ReadAsync(token).ConfigureAwait(false);
            }
            catch (OperationCanceledException)
            {
                cancellationToken.ThrowIfCancellationRequested();
                break; // The cancellation was induced by timeout (ignore it)
            }
            catch (ChannelClosedException)
            {
                if (items.Count == 0) throw;
                break;
            }
            items.Add(item);
            if (items.Count >= batchSize) break;
        }
    }
    return items.ToArray();
}

此方法将在经过指定的timeout之后立即生成一个批处理,或者如果达到了batchSize,则将更快地生成一个批处理,前提是该批处理至少包含一项。否则,它将在收到第一件物品后立即产生单件物品。

如果通过调用channel.Writer.Complete()方法完成了频道,并且其中不包含更多项目,则ReadBatchAsync方法将传播与本地{{ 1}}方法。

如果外部ReadAsync被取消,则通过抛出CancellationToken来传播取消。这是大多数可取消API的ChannelClosedException

用法示例:

OperationCanceledException

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。