微信公众号搜"智元新知"关注
微信扫一扫可直接关注哦!

c# – 以固定或最小间隔处理Rx事件

我有一系列事件,每10-1000毫秒发生一次.我订阅了这个事件来源,但希望以500ms的固定(或最小)间隔处理它们.
我也想一次处理一个事件,而不是批量处理(如Buffer(x> 1)).

代码中的这样的东西:

observable.MinimumInterval(TimeSpan.FromMiliseconds(500)).Subscribe(v=>...);

试图例如:

observable.Buffer(1).Delay(TimeSpan.FromMiliseconds(500).Subscribe(v=>...);

以及许多其他潜在的解决方案.到目前为止没有运气.

有任何想法吗?

解决方法

我回答了这个问题 on my blog here.

通过添加呈现作为扩展方法来再现(在链接腐烂的情况下!):

将Rx中的事件流约束到最大速率

有时,您希望限制事件从Rx流到达的速率.

如果另一个事件在指定的时间间隔内到达,则Throttle操作符将抑制该事件.这在许多情况下非常有用,但它确实有两个重要的副作用 – 即使未被抑制的事件也会被间隔延迟,如果事件太快到达,事件将完全被丢弃.

我遇到了这两种情况都不可接受的情况.在这种特殊情况下,所需的行为如下:事件应以TimeSpan指定的最大速率输出,否则应尽快输出.

一种解决方案就是这样.想象一下,我们的输入流是一群人到达火车站.对于我们的输出,我们希望人们以最高速度离开车站.我们设定最高费率,让每个人站在平板铁路卡车的前面,然后以固定的速度将卡车送出车站.因为只有一条轨道,并且所有卡车以相同的速度行驶并且具有相同的长度,所以当卡车背对背地离开时,人们将以最大速率离开车站.但是,如果赛道清晰,下一个人将能够立即离开.

那么我们如何将这个比喻翻译成Rx呢?

我们将使用Concat运算符接受流的流并将它们背靠背地合并在一起 – 就像将铁路卡车送到轨道上一样.

为了将每个人的等价物放到铁路卡车上,我们将使用选择将每个事件(人)投射到以单个OnNext事件(人员)开头并以OnComplete完全结束的可观察序列(铁路卡车).稍后定义的间隔.

让我们假设输入事件是变量输入中的IObservable.这是代码

var paced = input.Select(i => Observable.Empty<T>()
                                        .Delay(interval)
                                        .StartWith(i)).Concat();

作为一种扩展方法,这将成为:

public static IObservable<T> Pace<T>(this IObservable<T> source,TimeSpan interval)
{
    return source.Select(i => Observable.Empty<T>()
                                        .Delay(interval)
                                        .StartWith(i)).Concat();

}

原文地址:https://www.jb51.cc/csharp/98259.html

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。

相关推荐