如何解决从外部对 BackgroundService 中的取消做出反应
如果我在调试模式下在 .NET Core BackgroundService 中启动以下示例:
protected override Task ExecuteAsync(CancellationToken stoppingToken)
{
while (!stoppingToken.IsCancellationRequested)
{
Task.Run(async () => await Task.Delay(30000,stoppingToken))
.Wait(stoppingToken);
}
}
Ctrl + C
取消事件不会调用从 StopAsync()
调用 Cancel()
的 CancellationTokenSource
方法。
我认为我的问题类似于 this 帖子。
当我在 ExecuteAsync 中使用阻塞方法时,如何捕获这些取消?
ps:在现实世界中,我的 ExecuteAsync 正在监视文件系统,直到在我的目的地创建新文件。为了实现这种行为,我使用了 FileSystemWatcher.WaitForChanged() 方法。
解决方法
从评论来看,问题似乎与线程无关。真正的问题是如何停止 FileSystemWatcher。
您不需要额外的线程与 FileSystemWatcher,您需要尽快处理其更改事件。您可以为此使用异步事件处理程序,或者更好的是,将事件快速发布到队列或通道进行处理。
要停止 FSW,您可以使用 CancellationToken.Register 方法将 EnableRaisingEvents
设置为 false
:
stoppingToken.Register(()=>watcher.EnableRaisingEvents=false);
事件处理
为了快速处理事件,可以将 FileSystemEventArgs 值直接发布到队列或 Channel 中,并使用其他任务处理它们。这有两个好处:
- 尽可能快地处理文件事件,因此不会丢失任何文件
- 代码可以等待所有事件完成,也可以取消它们。
var channel=Channel.CreateUnbounded<FileSystemEventArgs>();
stoppingToken.Register(()=>{
watcher.EnableRaisingEvents=false;
channel.Writer.TryComplete();
});
watcher.Changed+=(o,e)=>channel.Writer.WriteAsync(e,stoppingToken);
await foreach(var e in channel.Reader.ReadAllAsync(stoppingToken))
{
//do something
}
一个Channel可以看成是一个异步读写操作的队列。 ReadAllAsync 方法使消息出列直到停止并将它们作为 IAsyncEnumerable 返回,这允许使用 await foreach
轻松地异步处理项目。
管道和渠道
代码可以重构为:
await watcher.AsChannel(stoppingToken)
.ProcessEvents(stoppingToken);
消费者
很容易将订阅者代码提取到单独的方法中。这甚至可以是一个扩展方法:
public static async Task ProcessEvents(this ChannelReader<FileSystemEventArgs> reader,CancellationToken stoppingToken)
{
await foreach(var e in channel.Reader.ReadAllAsync(stoppingToken))
{
//do something
}
}
并称之为:
var channel=Channel.CreateUnbounded<FileSystemEventArgs>();
stoppingToken.Register(()=>{
watcher.EnableRaisingEvents=false;
channel.Writer.TryComplete();
});
watcher.Changed+=(o,stoppingToken);
await ProcessEvents(channel,stoppingToken);
这是有效的,因为 Channel
有 implicit cast operators 到 ChannelReader
和 ChannelWriter
。
ChannelReader 支持多个消费者,因此可以使用多个任务来处理事件,例如:
public static async Task ProcessEvents(this ChannelReader<FileSystemEventArgs> reader,int dop,CancellationToken stoppingToken)
{
var tasks=Enumerable.Range(0,dop).Select(()=>{
await foreach(var e in channel.Reader.ReadAllAsync(stoppingToken))
{
//do something
}
});
await Task.WhenAll(tasks);
}
制作人
也可以将频道创建和发布提取到单独的方法中。毕竟,我们只需要 ChannelReader 进行处理:
public static ChannelReader AsChannel(this FileSystemWatcher watcher,CancellationToken stoppingToken)
{
var channel=Channel.CreateUnbounded<FileSystemEventArgs>();
stoppingToken.Register(()=>{
watcher.EnableRaisingEvents=false;
channel.Writer.TryComplete();
});
watcher.Changed+=(o,stoppingToken);
return channel;
}
并将所有内容组合在一个简单的管道中:
await watcher.AsChannel(stoppingToken)
.ProcessEvents(stoppingToken);
,
我目前的第一个解决方法是:
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
while (!stoppingToken.IsCancellationRequested)
{
var blockingTask = Task.Run(async () => await Task.Delay(30000,stoppingToken));
await Task.WhenAny(blockingTask);
}
}
@Panagiotis Kanavos 感谢您的努力,如果我想将我的“阻塞”FSW 更改为事件驱动的 FSW,我将回到您的详细帖子。
在生产中,我使用的是这样的:
private void DoServiceWork()
{
// Some Work if new PDF or docx file is available
// ...
}
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
int myTimeout = 1000 * 60 * 60; // 1 hour
while (!stoppingToken.IsCancellationRequested)
{
pdfWatchingTask = Task.Run(() => MyFSWLibrary.Watch(directory,"*.pdf",myTimeout,stoppingToken));
docWatchingTask = Task.Run(() => MyFSWLibrary.Watch(directory,"*.docx",stoppingToken));
var finishedTask = await Task.WhenAny(new Task<MyFSWResult>[] { waitPdfTask,waitXmpTask });
if(finishedTask.Result.Success) DoServiceWork();
}
}
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。