如何解决如何在Rx.Net中实现exhaustMap处理程序?
我正在从exhaustMap
寻找类似于rxjs
运算符的内容,但是RX.NET
似乎没有这样的运算符。
我需要实现的是,在源流的每个元素上,我需要启动一个async
处理程序,直到完成,我想从源中删除任何元素。处理程序完成后,立即恢复使用元素。
我不希望在每个元素上启动异步处理程序-在处理程序运行时,我想删除源元素。
我还怀疑我需要在这里巧妙地使用defer运算符吗?
谢谢!
解决方法
这是ExhaustMap
运算符的实现。可观察到的源投影到IObservable<Task<TResult>>
上,其中每个后续任务(如果仍在运行)是前一个任务,否则是与当前项目关联的新任务。然后使用DistinctUntilChanged
运算符删除重复出现的同一任务,最后使用Concat
运算符将可观察对象展平。
public static IObservable<TResult> ExhaustMap<TSource,TResult>(
this IObservable<TSource> source,Func<TSource,Task<TResult>> function)
{
return source
.Scan(Task.FromResult<TResult>(default),(previousTask,item) =>
{
return !previousTask.IsCompleted ? previousTask : HideIdentity(function(item));
})
.DistinctUntilChanged()
.Concat();
async Task<TResult> HideIdentity(Task<TResult> task) => await task;
}
不能保证function
返回的任务是不同的,因此需要HideIdentity
局部函数来返回任务的不同包装。
用法示例:
Observable
.Interval(TimeSpan.FromMilliseconds(200))
.Select(x => (int)x + 1)
.Take(10)
.Do(x => Console.WriteLine($"Input: {x}"))
.ExhaustMap(async x => { await Task.Delay(x % 3 == 0 ? 500 : 100); return x; })
.Do(x => Console.WriteLine($"Result: {x}"))
.Wait();
输出:
Input: 1
Result: 1
Input: 2
Result: 2
Input: 3
Input: 4
Input: 5
Result: 3
Input: 6
Input: 7
Input: 8
Result: 6
Input: 9
Input: 10
Result: 9
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。