如何在 Rx.Net 中使用 iteself 递归地执行打结/定义可观察对象?

如何解决如何在 Rx.Net 中使用 iteself 递归地执行打结/定义可观察对象?

有时业务逻辑似乎能够通过一些递归定义的可观察对象自然建模。下面是一个例子:

interface Demo {
    IObservable<CommandId> userCommands;
    IObservable<IObservable<IProcessingState>> processes;
    IObservable<CommandId> skippedCommands;
    IObservable<(CommandId,CommandResult)> runcommand(CommandId id);
}

interface IProcessingState {
    bool IsProcessing {get;}
    CommandId? ProcessingId {get;}
}

对于用户输入的每个命令,它应该在 prcocess 中触发一个正在运行的进程,或者在 skippedCommands 中发出一个值。这个逻辑的一些直接翻译也许

var validCommands = userCommands.WithLatestFrom(processes).Where(x => !x.Item2.IsProcessing)
var skippedCommands = userCommands.WithLatestFrom(processes).Where(x => x.Item2.IsProcessing)
var processes = validCommands.Select(c => runcommand(c))

如上代码所示,validCommandsprocesses 的赋值是相互递归的,我们可以等价地定义 processes 直接递归使用自身

var processes = userCommands.WithLatestFrom(processes)
                            .Where(x => !x.Item2.IsProcessing)
                            .Select(c => runcommand(c))

但是我们不能像这样在 C# 中定义 prcesses Observable。

我发现了几个可能的相关事物:

  1. Observable.Generate 构造函数。然而,它似乎以同步方式折叠自己的状态,我不知道如何在 userCommands 中使用 runcommand observable 和 Observable.Generate;

  2. RxJS 中的一些运算符如 exhaustexhaustMap,而 Rx.Net 没有提供这个运算符,有一些 3rd-party 库提供了这些运算符,例如 {{3} }. 实现就像

let exhaustMap f source =
        Observable.Create (fun (o : IObserver<_>) -> 
            let mutable hasSubscription = false
            let mutable innerSub = None
            let onInnerCompleted () =
                hasSubscription <- false
                innerSub |> Option.iter disposable.dispose
            let onOuterNext x =
                if not hasSubscription then
                    hasSubscription <- true 
                    f x |> subscribeSafeWithCallbacks 
                            o.OnNext o.OnError onInnerCompleted
                        |> fun y -> innerSub <- Some y
            source
            |> subscribeSafeWithCallbacks
                onOuterNext o.OnError o.OnCompleted)

但是,有两个问题。 一种。直接使用此运算符不符合上述要求,跳过的命令将被静忽略。我们可以稍微修改一下源代码以满足要求,但还有一个问题 湾该实现引入了两个本地可变变量和两个嵌套订阅。我不知道这是否在所有情况下都可以(会有数据竞争的风险吗?),并且更喜欢基于操作符组合而不是可变引用的解决方

  1. FSharp.Control.Reactive 提供了前向引用类型 StreamLoopCellLoop。根据 Functional Reactive Programming 一书,这些前向引用类型的 Rx 替代方案是 Subject,通过使用 Subject,上面的递归构造分为两个阶段。问题是 Intro to Rx 指出,使用 Subject 需要手动管理更多状态,至少需要处理主题,并且可能被迫热观察。我想知道是否存在不使用 Subject

    解决方
  2. window 结果上使用 runcommand 运算符与最后一个值的边界(就在完成之前),上面的 processes 可以有一些构造,但是这个解决方案需要使用结束信号两次,这需要仔细处理(尝试和调整 Take(1)zipwithLatestFromcombineLatestWindow 运算符的重载以获取期望的结果)同时发生的事件。

有没有更好的解决方案或对上述解决方案进行修改,尤其是仅使用运算符?

解决方法

你的类型都很奇怪而且很难处理。您的问题基本上是一个带有两个触发器的简单状态机:1) 新命令到达,2) 上一个命令执行完毕。

这应该会让你开始:

void Main()
{
    IObservable<ICommand> source = new Subject<ICommand>();
    var executionTerminatedLoopback = new Subject<Unit>();
    
    var stateMachine = source
        .Select(c => (command: c,type: 1))
        .Merge(executionTerminatedLoopback.Select(_ => (command: (ICommand)null,type: 2)))
        .Scan((isExecuting: false,validCommand: (ICommand)null,failedCommand: (ICommand)null),(state,msg) => {
            if(msg.type == 2)
                return (false,null,null);
            if(state.isExecuting)
                return (true,msg.command);
            else
                return (true,msg.command,null);
        });
    
    var validCommands = stateMachine.Where(t => t.validCommand != null).Select(t => t.validCommand);
    var failedCommands = stateMachine.Where(t => t.failedCommand != null).Select(t => t.failedCommand);
    
    validCommands.SelectMany(c => c.Execute()).Subscribe(executionTerminatedLoopback);
}

public interface ICommand{ 
    IObservable<Unit> Execute();
}

source 此处不必是主题,也可能不应该是:它可以是任何 IObservable<ICommand>。在答案中模拟更容易。 exeuctionTerminatedLoopback 确实必须如此,以(如您所说)将递归分解为两部分。因为是主题,所以应该保密,不要泄露。

我认为不使用 Subject 在 C# 中没有任何答案。

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。

相关推荐


Selenium Web驱动程序和Java。元素在(x,y)点处不可单击。其他元素将获得点击?
Python-如何使用点“。” 访问字典成员?
Java 字符串是不可变的。到底是什么意思?
Java中的“ final”关键字如何工作?(我仍然可以修改对象。)
“loop:”在Java代码中。这是什么,为什么要编译?
java.lang.ClassNotFoundException:sun.jdbc.odbc.JdbcOdbcDriver发生异常。为什么?
这是用Java进行XML解析的最佳库。
Java的PriorityQueue的内置迭代器不会以任何特定顺序遍历数据结构。为什么?
如何在Java中聆听按键时移动图像。
Java“Program to an interface”。这是什么意思?
Java在半透明框架/面板/组件上重新绘画。
Java“ Class.forName()”和“ Class.forName()。newInstance()”之间有什么区别?
在此环境中不提供编译器。也许是在JRE而不是JDK上运行?
Java用相同的方法在一个类中实现两个接口。哪种接口方法被覆盖?
Java 什么是Runtime.getRuntime()。totalMemory()和freeMemory()?
java.library.path中的java.lang.UnsatisfiedLinkError否*****。dll
JavaFX“位置是必需的。” 即使在同一包装中
Java 导入两个具有相同名称的类。怎么处理?
Java 是否应该在HttpServletResponse.getOutputStream()/。getWriter()上调用.close()?
Java RegEx元字符(。)和普通点?