替换代表状态的 Rx.NET Subject 的方法

如何解决替换代表状态的 Rx.NET Subject 的方法

我目前正在使用以下方法修复错误,该方法stateChecker 时轮询 null 条件,直到它变为 true(或 false 由于超时):>

private static void WaitWithSubject(
   Func<bool> stateChecker,TimeSpan timeout,TimeSpan stepTime,string errorMessage,ILifetimeInfo lifetimeInfo)
{
   (bool? IsOk,string Message) state = (IsOk: null,Message: string.Empty);
   var waitCancellation = (int)stepTime.TotalMilliseconds;
   using (var stateSubject = new Subject<(bool? IsOk,string Message)>())
   {
      using (Observable.Timer(timeout).Subscribe(it => stateSubject.OnNext((IsOk: false,Message: errorMessage))))
      using (Observable.Timer(TimeSpan.Zero,stepTime).
         Subscribe(it =>
         {
            if (stateChecker())
               stateSubject.OnNext((IsOk: true,Message: string.Empty));
         }))
      {
         using (stateSubject.Subscribe(it => state = it))
         {
            while (state.IsOk == null)
               lifetimeInfo.Canceler.ThrowIfCancellationRequested(waitCancellation);
            if (state.IsOk != true)
               throw new TimeoutException(state.Message);
            stateSubject.OnCompleted();
         }
      }
   }
}

方法偶尔会在执行方法 ObjectdisposedException代码中的以下点生成 OnNext :

if ( stateChecker() )
    stateSubject.OnNext( ( IsOk: true,Message: string.Empty ) );

在这种情况下,有没有办法完全避免使用 Subject 以支持诸如 Observable.IntervalObservable.Create 之类的东西?

解决方法

在我看来,这就是你想要做的:

private static void WaitWithSubject(Func<bool> stateChecker,TimeSpan timeout,TimeSpan stepTime,string errorMessage,ILifetimeInfo lifetimeInfo) =>
    Observable
        .Amb(
            Observable
                .Timer(timeout)
                .SelectMany(_ => Observable.Throw<Unit>(new TimeoutException(errorMessage))),Observable
                .Timer(TimeSpan.Zero,stepTime)
                .Where(_ => stateChecker())
                .Select(_ => Unit.Default))
        .Take(1)
        .Wait();

这里的关键是 Amb 运算符,它启动两个序列,并且只返回第一个序列的值以产生值或错误。 Take(1) 确保可观察对象在产生值后立即完成。

如果您有 Wait(),您可以在 CancellationToken 之前添加以下行以取消:

.TakeUntil(Observable.Create<Unit>(o => ct.Register(() => o.OnNext(Unit.Default))))

在与 Theodor 反复讨论之后,我想出了这个我认为可能是我能想到的最干净的版本:

private static void WaitWithSubject(Func<bool> stateChecker,ILifetimeInfo lifetimeInfo)
{
    var good =
        Observable
            .Timer(TimeSpan.Zero,stepTime)
            .Where(_ => stateChecker())
            .Take(1);

    var fail =
        Observable
            .Timer(timeout)
            .SelectMany(_ => Observable.Throw<long>(new TimeoutException(errorMessage)));
    
    good.Merge(fail).RunAsync(lifetimeInfo.Canceler).Wait();
}
,

这是一种与 WaitWithSubject 具有类似行为的方法,但不使用 Subject<T>。它使用 Merge 运算符,以便将两个计时器生成的序列合并为一个序列。它还具有取消支持。

public static void WaitUntilTrueState(
    Func<bool> stateChecker,TimeSpan checkInterval,string timeoutMessage,CancellationToken cancellationToken = default)
{
    Observable
        .Timer(TimeSpan.Zero,checkInterval)
        .Merge(Observable.Timer(timeout).IgnoreElements()
            .Concat(Observable.Throw<long>(new TimeoutException(timeoutMessage))))
        .TakeUntil(_ => stateChecker())
        .RunAsync(cancellationToken)
        .Wait();
}

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。

相关推荐


Selenium Web驱动程序和Java。元素在(x,y)点处不可单击。其他元素将获得点击?
Python-如何使用点“。” 访问字典成员?
Java 字符串是不可变的。到底是什么意思?
Java中的“ final”关键字如何工作?(我仍然可以修改对象。)
“loop:”在Java代码中。这是什么,为什么要编译?
java.lang.ClassNotFoundException:sun.jdbc.odbc.JdbcOdbcDriver发生异常。为什么?
这是用Java进行XML解析的最佳库。
Java的PriorityQueue的内置迭代器不会以任何特定顺序遍历数据结构。为什么?
如何在Java中聆听按键时移动图像。
Java“Program to an interface”。这是什么意思?
Java在半透明框架/面板/组件上重新绘画。
Java“ Class.forName()”和“ Class.forName()。newInstance()”之间有什么区别?
在此环境中不提供编译器。也许是在JRE而不是JDK上运行?
Java用相同的方法在一个类中实现两个接口。哪种接口方法被覆盖?
Java 什么是Runtime.getRuntime()。totalMemory()和freeMemory()?
java.library.path中的java.lang.UnsatisfiedLinkError否*****。dll
JavaFX“位置是必需的。” 即使在同一包装中
Java 导入两个具有相同名称的类。怎么处理?
Java 是否应该在HttpServletResponse.getOutputStream()/。getWriter()上调用.close()?
Java RegEx元字符(。)和普通点?