微信公众号搜"智元新知"关注
微信扫一扫可直接关注哦!

在OperationCanceledException之后ChannelReader完成任务永远不会完成

如何解决在OperationCanceledException之后ChannelReader完成任务永远不会完成

如果我调用 Stop()OperationCanceledException 发生并且 _writer.TryComplete(exp) 为真。但 _reader.Completion Task 仍未完成。

这是频道的理想行为吗?如果是的话,有人能告诉我如何停止 Channel 而不必等到它为空并使其 Completion Task 处于 Completed 状态吗?

public interface IItem
{
    Uri SourceUri { get; }

    string TargetPath { get; }
}

public class Item : IItem
{
    public Item(Uri sourceUri,string targetPath)
    {
        SourceUri = sourceUri;
        TargetPath = targetPath;
    }

    public Uri SourceUri { get; }

    public string TargetPath { get; }
}

public class TestService
{
    private readonly ChannelWriter<IItem> _writer;
    private readonly ChannelReader<IItem> _reader;

    private readonly CancellationTokenSource _cts;

    public TestService()
    {
        _cts = new CancellationTokenSource();
        Channel<IItem> channel = Channel.CreateUnbounded<IItem>();
        _reader = channel.Reader;
        _writer = channel.Writer;
    }

    public async Task QueueDownload(IItem information)
    {
        await _writer.WriteAsync(information);
    }

    public void StartDownload()
    {
        Task.Factory.StartNew(async () =>
        {
            await ProcessDownloadAsync();
        },TaskCreationoptions.LongRunning);
    }

    public void Stop()
    {
        _cts.Cancel();
        //_writer.Complete();
        //_writer = null;
        Console.WriteLine("Stop");
    }

    public async Task Wait()
    {
        await _reader.Completion;
    }

    private async Task ProcessDownloadAsync()
    {
        try
        {
            while (await _reader.WaitToReadAsync(_cts.Token))
            {
                IItem information = await _reader.ReadAsync(_cts.Token);
                using (WebClient webClient = new WebClient())
                {
                    Console.WriteLine(information.TargetPath);
                    await webClient.DownloadFileTaskAsync(information.sourceUri,information.TargetPath);
                }
            }
        }
        catch (OperationCanceledException exp)
        {
            bool res = _writer.TryComplete(exp);
        }

    }
}

static class Program
{
    static async Task Main(string[] args)
    {
        TestService tSvc = new TestService();
        await tSvc.QueueDownload(new Item(new Uri(@"https://images.pexels.com/" +
            @"photos/753626/pexels-photo-753626.jpeg"),@"D:\\Temp\1.png"));
        await tSvc.QueueDownload(new Item(new Uri(@"https://images.pexels.com/" +
            @"photos/753626/pexels-photo-753626.jpeg"),@"D:\\Temp\1.png"));

        tSvc.StartDownload();
        Task t = tSvc.Wait();
        tSvc.Stop();

        await t;

        Console.WriteLine("Finished");
    }
}

解决方法

ChannelWriter.Complete 方法的行为与人们预期的略有不同。它不会立即使频道的内容无效。相反,它只是阻止在频道中添加更多项目。现有项目仍可用于消费,并且 ChannelReader.Completion 属性不会在所有存储项目消费完之前完成。

下面的示例演示了这种行为:

var channel = Channel.CreateUnbounded<int>();
channel.Writer.TryWrite(1);
channel.Writer.Complete(new FileNotFoundException());
//channel.Reader.TryRead(out var data);
var completed = channel.Reader.Completion.Wait(500);
Console.WriteLine($"Completion: {(completed ? "OK" : "Timed-out")}");

输出:

Completion: Timed-out

您可以取消注释 channel.Reader.TryRead 行,以查看出现的 FileNotFoundException

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。