从外部对 BackgroundService 中的取消做出反应

如何解决从外部对 BackgroundService 中的取消做出反应

如果我在调试模式下在 .NET Core BackgroundService 中启动以下示例:

protected override Task ExecuteAsync(CancellationToken stoppingToken)
{
    while (!stoppingToken.IsCancellationRequested)
    {
        Task.Run(async () => await Task.Delay(30000,stoppingToken))
            .Wait(stoppingToken);
    }
}

Ctrl + C 取消事件不会调用StopAsync() 调用 Cancel()CancellationTokenSource 方法

我认为我的问题类似于 this 帖子。

当我在 ExecuteAsync 中使用阻塞方法时,如何捕获这些取消?

ps:在现实世界中,我的 ExecuteAsync 正在监视文件系统,直到在我的目的地创建新文件。为了实现这种行为,我使用了 FileSystemWatcher.WaitForChanged() 方法

解决方法

从评论来看,问题似乎与线程无关。真正的问题是如何停止 FileSystemWatcher。

您不需要额外的线程与 FileSystemWatcher,您需要尽快处理其更改事件。您可以为此使用异步事件处理程序,或者更好的是,将事件快速发布到队列或通道进行处理。

要停止 FSW,您可以使用 CancellationToken.Register 方法将 EnableRaisingEvents 设置为 false :

stoppingToken.Register(()=>watcher.EnableRaisingEvents=false);

事件处理

为了快速处理事件,可以将 FileSystemEventArgs 值直接发布到队列或 Channel 中,并使用其他任务处理它们。这有两个好处:

  • 尽可能快地处理文件事件,因此不会丢失任何文件
  • 代码可以等待所有事件完成,也可以取消它们。
var channel=Channel.CreateUnbounded<FileSystemEventArgs>();

stoppingToken.Register(()=>{
    watcher.EnableRaisingEvents=false;
    channel.Writer.TryComplete();
});

watcher.Changed+=(o,e)=>channel.Writer.WriteAsync(e,stoppingToken);

await foreach(var e in channel.Reader.ReadAllAsync(stoppingToken))
{
    //do something
}

一个Channel可以看成是一个异步读写操作的队列。 ReadAllAsync 方法使消息出列直到停止并将它们作为 IAsyncEnumerable 返回,这允许使用 await foreach 轻松地异步处理项目。

管道和渠道

代码可以重构为:

await watcher.AsChannel(stoppingToken)
             .ProcessEvents(stoppingToken);

消费者

很容易将订阅者代码提取到单独的方法中。这甚至可以是一个扩展方法:

public static async Task ProcessEvents(this ChannelReader<FileSystemEventArgs> reader,CancellationToken stoppingToken)
{
    await foreach(var e in channel.Reader.ReadAllAsync(stoppingToken))
    {
        //do something
    }
}

并称之为:

var channel=Channel.CreateUnbounded<FileSystemEventArgs>();

stoppingToken.Register(()=>{
    watcher.EnableRaisingEvents=false;
    channel.Writer.TryComplete();
});

watcher.Changed+=(o,stoppingToken);

await ProcessEvents(channel,stoppingToken);

这是有效的,因为 Channelimplicit cast operatorsChannelReaderChannelWriter

ChannelReader 支持多个消费者,因此可以使用多个任务来处理事件,例如:

public static async Task ProcessEvents(this ChannelReader<FileSystemEventArgs> reader,int dop,CancellationToken stoppingToken)
{
    var tasks=Enumerable.Range(0,dop).Select(()=>{
        await foreach(var e in channel.Reader.ReadAllAsync(stoppingToken))
        {
            //do something
        }
    });
    await Task.WhenAll(tasks);
}

制作人

也可以将频道创建和发布提取到单独的方法中。毕竟,我们只需要 ChannelReader 进行处理:

public static ChannelReader AsChannel(this FileSystemWatcher watcher,CancellationToken stoppingToken)
{
    var channel=Channel.CreateUnbounded<FileSystemEventArgs>();

    stoppingToken.Register(()=>{
        watcher.EnableRaisingEvents=false;
        channel.Writer.TryComplete();
    });

    watcher.Changed+=(o,stoppingToken);
    return channel;
}

并将所有内容组合在一个简单的管道中:

await watcher.AsChannel(stoppingToken)
             .ProcessEvents(stoppingToken);
,

我目前的第一个解决方法是:

protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
    while (!stoppingToken.IsCancellationRequested)
    {
        var blockingTask = Task.Run(async () => await Task.Delay(30000,stoppingToken));
        await Task.WhenAny(blockingTask);
    }
}

@Panagiotis Kanavos 感谢您的努力,如果我想将我的“阻塞”FSW 更改为事件驱动的 FSW,我将回到您的详细帖子。

在生产中,我使用的是这样的:

private void DoServiceWork() 
{ 
    // Some Work if new PDF or docx file is available 
    // ...
}

protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
    int myTimeout = 1000 * 60 * 60; // 1 hour

    while (!stoppingToken.IsCancellationRequested)
    {
        pdfWatchingTask = Task.Run(() => MyFSWLibrary.Watch(directory,"*.pdf",myTimeout,stoppingToken));
        docWatchingTask = Task.Run(() => MyFSWLibrary.Watch(directory,"*.docx",stoppingToken));

        var finishedTask = await Task.WhenAny(new Task<MyFSWResult>[] { waitPdfTask,waitXmpTask });
        if(finishedTask.Result.Success) DoServiceWork();
    }
}   

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。

相关推荐


Selenium Web驱动程序和Java。元素在(x,y)点处不可单击。其他元素将获得点击?
Python-如何使用点“。” 访问字典成员?
Java 字符串是不可变的。到底是什么意思?
Java中的“ final”关键字如何工作?(我仍然可以修改对象。)
“loop:”在Java代码中。这是什么,为什么要编译?
java.lang.ClassNotFoundException:sun.jdbc.odbc.JdbcOdbcDriver发生异常。为什么?
这是用Java进行XML解析的最佳库。
Java的PriorityQueue的内置迭代器不会以任何特定顺序遍历数据结构。为什么?
如何在Java中聆听按键时移动图像。
Java“Program to an interface”。这是什么意思?
Java在半透明框架/面板/组件上重新绘画。
Java“ Class.forName()”和“ Class.forName()。newInstance()”之间有什么区别?
在此环境中不提供编译器。也许是在JRE而不是JDK上运行?
Java用相同的方法在一个类中实现两个接口。哪种接口方法被覆盖?
Java 什么是Runtime.getRuntime()。totalMemory()和freeMemory()?
java.library.path中的java.lang.UnsatisfiedLinkError否*****。dll
JavaFX“位置是必需的。” 即使在同一包装中
Java 导入两个具有相同名称的类。怎么处理?
Java 是否应该在HttpServletResponse.getOutputStream()/。getWriter()上调用.close()?
Java RegEx元字符(。)和普通点?