奇怪的 Rx+CancellationToken 问题:有时注册的回调没有完成

如何解决奇怪的 Rx+CancellationToken 问题:有时注册的回调没有完成

我观察到一个奇怪的现象,有时会出现在我编写的 Rx 查询中,它涉及 CancellationToken。两个回调注册到同一个 CancellationToken一个查询之外,另一个查询的一部分。 CancellationToken 的目的是发出查询终止的信号。出现的情况是有时第二个回调卡在执行中间,永远无法完成,阻止调用一个回调。

以下是重现该问题的最小示例。它不是很小,但我不能进一步减少它。例如,将 Switch 运算符替换为 Merge 会使问题消失。如果 Task.Delay(1000,cts.Token) 抛出的异常被吞下,也会发生同样的情况。

public class Program
{
    public static void Main()
    {
        var cts = new CancellationTokenSource(500);
        cts.Token.Register(() => Console.WriteLine("### Token Canceled! ###"));
        try
        {
            Observable
                .Timer(TimeSpan.Zero,TimeSpan.FromMilliseconds(1000))
                .takeuntil(Observable.Create<Unit>(observer =>
                    cts.Token.Register(() =>
                    {
                        Console.WriteLine("Before observer.OnNext");
                        observer.OnNext(Unit.Default);
                        Console.WriteLine("After observer.OnNext");
                    })))
                .Select(_ =>
                {
                    return Observable.StartAsync(async () =>
                    {
                        Console.WriteLine("Action starting");
                        await Task.Delay(1000,cts.Token);
                        return 1;
                    });
                })
                .Switch()
                .Wait();
        }
        catch (Exception ex) { Console.WriteLine("Failed: {0}",ex.Message); }
        Thread.Sleep(500);
        Console.WriteLine("Finished");
    }
}

预期输出

Action starting
Before observer.OnNext
After observer.OnNext
### Token Canceled! ###
Failed: A task was canceled.
Finished

实际输出(有时):

Action starting
Before observer.OnNext
Failed: A task was canceled.
Finished

Try it on fiddle。在问题出现之前,您可能需要运行该程序 3-4 次。请注意缺少的两个日志条目。似乎调用 observer.OnNext(Unit.Default); 永远不会完成。

我的问题是:有谁知道是什么导致了这个问题?此外,我如何修改查询CancellationToken 相关部分,以便它执行其预期目的(终止查询),而不干扰相同 CancellationToken 的其他已注册回调?

.NET 5.0.1 & .NET Framework 4.8、System.Reactive 5.0.0、C# 9


一个观察结果:如果我修改 Observable.Create 委托使其返回 disposable.Empty 而不是 CancellationTokenRegistration,如下所示:

.takeuntil(Observable.Create<Unit>(observer =>
{
    cts.Token.Register(() =>
    {
        Console.WriteLine("Before observer.OnNext");
        observer.OnNext(default);
        Console.WriteLine("After observer.OnNext");
    });
    return disposable.Empty;
}))

但我不认为忽略 cts.Token.Register 返回的注册是一种解决方法

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。

相关推荐


Selenium Web驱动程序和Java。元素在(x,y)点处不可单击。其他元素将获得点击?
Python-如何使用点“。” 访问字典成员?
Java 字符串是不可变的。到底是什么意思?
Java中的“ final”关键字如何工作?(我仍然可以修改对象。)
“loop:”在Java代码中。这是什么,为什么要编译?
java.lang.ClassNotFoundException:sun.jdbc.odbc.JdbcOdbcDriver发生异常。为什么?
这是用Java进行XML解析的最佳库。
Java的PriorityQueue的内置迭代器不会以任何特定顺序遍历数据结构。为什么?
如何在Java中聆听按键时移动图像。
Java“Program to an interface”。这是什么意思?
Java在半透明框架/面板/组件上重新绘画。
Java“ Class.forName()”和“ Class.forName()。newInstance()”之间有什么区别?
在此环境中不提供编译器。也许是在JRE而不是JDK上运行?
Java用相同的方法在一个类中实现两个接口。哪种接口方法被覆盖?
Java 什么是Runtime.getRuntime()。totalMemory()和freeMemory()?
java.library.path中的java.lang.UnsatisfiedLinkError否*****。dll
JavaFX“位置是必需的。” 即使在同一包装中
Java 导入两个具有相同名称的类。怎么处理?
Java 是否应该在HttpServletResponse.getOutputStream()/。getWriter()上调用.close()?
Java RegEx元字符(。)和普通点?