微信公众号搜"智元新知"关注
微信扫一扫可直接关注哦!

javascript – 根据rxjs中的计时处理事件流

我有一个进程,每隔一段时间发送一次数据包,我需要根据数据包到达的时间等来管理该数据流.在某些时候,我也关闭流和过程.

现在,我正在使用一组计时器来做这件事,但我希望我可以用rxjs来做,因为它似乎非常适合这种事情.到目前为止,我没有取得多大成功.

问题

该流应该定期向我发送数据包,但它通常偏离很多,有时会卡住.

我希望在以下条件下关闭流:

>如果需要超过startDelay发送第一个数据包.
>发送第一个数据包后,如果两个数据包之间存在多于middleDelay的暂停.
>经过一段时间的maxChannelTime.

由于上述任何原因我即将关闭流时,我首先要求它礼貌地关闭以便它可以进行一些清理.有时它还会在清理过程中向我发送最终数据包.但是我想等待清理时间不要超过cleanupTime,并且在关闭流之前到达最后一个数据并忽略更多消息.

我将通过使用Observable包装事件来创建“流”.我这样做没有问题.

通过“关闭”流,我的意思是告诉进程停止发送数据,并可能关闭(即死亡).

解决方法

棘手的问题.

我把它分解为两个阶段 – “受监管”(因为我们要定期检查)和“清理”.

向后工作,输出

const regulated = source.takeuntil(close)
const cleanup = source.skipUntil(close).takeuntil(cleanupCloser)
const output = regulated.merge(cleanup)

‘闭门器’是在关闭时发出的可观察量(每个超时值更近一个).

const startTimeout = 600
const intervalTimeout = 200
const maxtimeTimeout = 3000
const cleanupTimeout = 300

const startCloser = Observable.timer(startTimeout)  // emit once after initial delay
  .takeuntil(source)                                // cancel after source emits
  .mapTo('startTimeoutMarker')

const intervalCloser = source.switchMap(x =>    // reset interval after each source emit
    Observable.timer(intervalTimeout)           // emit once after intervalTimeout
      .mapTo('intervalTimeoutMarker')
  )

const maxtimeCloser = Observable.timer(maxtimeTimeout)  // emit once after maxtime
  .takeuntil(startCloser)                               // cancel if startTimeout
  .takeuntil(intervalCloser)                            // cancel if intervalTimeout
  .mapTo('maxtimeTimeoutMarker')

const close = Observable.merge(startCloser,intervalCloser,maxtimeCloser).take(1)

const cleanupCloser = close.switchMap(x =>      // start when close emits
     Observable.timer(cleanupTimeout)           // emit once after cleanup time
  ) 
  .mapTo('cleanupTimeoutMarker')

这是一个工作样本CodePen(请一次运行一个测试)

原文地址:https://www.jb51.cc/js/156701.html

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。

相关推荐