“等待”监视IObservable< Status>,我想等待(所以我可以通过ContinueWith和其他任务进行线程).
我从OnNext处理订阅者开始执行以下任务,但这只是丑陋.我现在想到的是这种扩展方法:
public static Task<T> WaitFor<T>(this IObservable<T> source,Func<T,bool> pred) { var tcs = new taskcompletionsource<T>(); source .Where(pred) .distinctUntilChanged() .Take(1) //OnCompletes the observable,subscription will self-dispose .Subscribe(val => tcs.TrySetResult(val),ex => tcs.TrySetException(ex),() => tcs.TrySetCanceled()); return tcs.Task; }
(更新svick的处理OnCompleted和OnError的建议)
问题:
这是好,坏,还是丑?
我错过了一个可以做到这一点的现有扩展?
>正确的顺序是Where and distinctUntilChanged? (我想他们是)
解决方法
public static Task<T> WaitFor<T>(this IObservable<T> source,bool> pred) { return source .Where(pred) .distinctUntilChanged() .Take(1) .ToTask(); }
使用.ToTask()比引入taskcompletionsource要好得多.您需要引用System.Reactive.Threading.Tasks命名空间来获取.ToTask()扩展方法.
此外,distinctUntilChanged在此代码中是多余的.你只能得到一个值,所以默认情况下它必须是不同的.
现在,我的下一个建议可能有点有争议.这个扩展是一个坏主意,因为它隐藏了正在发生的真正语义.
var t = xs.WaitFor(x => x > 10);
要么:
var t = xs.Where(x => x > 10).Take(1).ToTask();
我通常喜欢第二个snippit,因为它清楚地显示了我发生了什么 – 我不需要记住WaitFor的语义.
除非您将WaitFor的名称更具描述性 – 或许是TakeOneAsTaskWhere,那么您将使用明确的代码来使用操作符,并使代码更难管理.
以下是不是更容易记住语义?
var t = xs.TakeOneAsTaskWhere(x => x > 10);
对我来说,底线是Rx运算符是组合的,而不是封装,但是如果你要封装它们,那么它们的含义就必须清楚.
我希望这有帮助.
原文地址:https://www.jb51.cc/csharp/91411.html
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。