如何解决在Rxjs中,如何暂停数据流并从第二个开始继续呢?
我有一个数据流每10秒进行一次长时间轮询。长轮询可以暂停,然后继续。
问题是:在继续时重新开始。我必须让它从第二个开始继续它被暂停。
所以,如果:
延迟(10000)
暂停为(7000)
然后,当我再次开始长时间轮询时,必须在(10000-7000)= 3000
之后提取我的数据今天是第四天,我一直被这个问题困扰着,发疯了。
我创造了这次闪电战
https://stackblitz.com/edit/ang7-working-rxjs-stream-timer-combined?file=src/app/app.component.ts
创建了一个流和一个提取数据的计时器。当流量和计时器停止时,设置剩余的时间以及新的延迟。但是当我按下开始键时,它不会再等待3秒钟,它会立即校准我不想要的服务
解决方法
这是一个非常基本的实现,它依赖于timer
函数以及take
,filter
和takeUntil
运算符。
我认为解决方案不太优雅。它取决于状态变量paused
和first
,并在订阅的next
回调内部具有递归触发器。另请注意,它目前不执行任何错误处理。在这种情况下,轮询将完全停止。
控制器
export const REFRESH_INTERVAL = 11;
@Component({ ... })
export class AppComponent implements OnDestroy {
pause$ = new Subject();
reset$ = new Subject();
closed$ = new Subject();
counter = REFRESH_INTERVAL;
res: any;
paused = false;
constructor(private freeApiService: freeApiService) {}
ngOnInit() {
this.init();
}
init() {
let first = true;
this.reset$
.pipe(
switchMap(_ =>
timer(0,1000).pipe(
take(this.counter),tap(_ => (this.paused = this.paused ? false : this.paused)),takeUntil(this.pause$),filter(_ => first || --this.counter === 0),finalize(
() => (this.paused = this.counter != 0 ? true : this.paused)
),switchMap(_ => this.freeApiService.getDummy())
)
),takeUntil(this.closed$)
)
.subscribe({
next: res => {
first = false;
this.res = res;
this.counter = REFRESH_INTERVAL;
this.reset$.next();
},error: error => console.log("Error fethcing data:",error)
});
}
ngOnDestroy() {
this.closed$.next();
}
}
模板
<div my-app>
<ng-container *ngIf="res; else noRes">
<button (mouseup)="paused ? reset$.next() : pause$.next()">
{{ paused ? 'Resume poll' : 'Pause poll' }}
</button>
<br><br>
Refreshing in: {{ counter }}
<br>
Response:
<pre>{{ res | json }}</pre>
</ng-container>
<ng-template #noRes>
<button (mouseup)="reset$.next()">
Start poll
</button>
</ng-template>
</div>
我已经修改了您的Stackblitz
,我将使用interval
,filter
和scan
运算符的组合,并且将检查频率设置为1s:
const pauserSubject = new BehaviorSubject(true);
const s$ = interval(1000).pipe(
withLatestFrom(pauserSubject),filter(([intervalValue,isAllowed]) => isAllowed),scan(acc => acc + 1,0),tap(emissionNumber => console.log('check attempt #',emissionNumber,' on ',new Date().toTimeString())),filter(emissionCount => !(emissionCount % 10)),switchMapTo(of('request/response'))
);
在这种情况下,常规interval
设置“节拍”,每隔1秒检查一次暂停是对还是错。取消暂停后,相应的filter
运算符允许继续。 scan
验证每10次检查(或每10秒检查一次)是否会导致http调用。
针对您的情况:
- 您每10秒钟开始轮询一次
- 您在第三秒暂停一秒钟
- 计数器确保应再进行7次检查(导致等待7秒)
- http呼叫是在第11秒(常规10分钟+等待1秒)进行的
这归结为具有“暂停计时器”。一旦有了它,就可以使用repeat
运算符在完成时重新运行它:
somePausableTimer$.pipe(
repeat(),switchMap(() => of('polling time'))
)
如果您关心时间精度,这是我对“可暂停计时器”的看法-我认为它与javascript所能达到的精度差不多,但是不会带来诸如“每10秒检查一次暂停状态”之类的解决方案带来的性能损失。毫秒”:
import {delay,distinctUntilChanged,filter,map,mapTo,pairwise,repeat,scan,startWith,switchMap,take,withLatestFrom} from 'rxjs/operators';
import {defer,fromEvent,NEVER,Observable,of} from 'rxjs';
function pausableTimer(timerSpan: number,isActive$: Observable<boolean>): Observable<boolean> {
const activeState$ = isActive$.pipe(
distinctUntilChanged(),startWith(true,true),map(isActive => ({
isActive,at: Date.now()
}))
);
const pauseSpans$ = activeState$.pipe(
pairwise(),filter(([,curr]) => curr.isActive),map(([prev,curr]) => curr.at - prev.at)
);
const accumulatedPauseSpan$ = pauseSpans$.pipe(
scan((acc,curr) => acc += curr,0)
);
return defer(() => {
const startTime = Date.now();
const originalEndTime = startTime + timerSpan;
return activeState$.pipe(
withLatestFrom(accumulatedPauseSpan$),switchMap(([activeState,accPause]) => {
if (activeState.isActive) {
return of(true).pipe(
delay(originalEndTime - Date.now() + accPause)
);
}
else {
return NEVER;
}
}),take(1)
);
});
}
该函数接受以毫秒为单位的timerSpan
(在您的情况下为10000)和观察到的isActive$
,它发出true
/ false
用于恢复/暂停。综上所述:
const isActive$ = fromEvent(document,'click').pipe(scan(acc => !acc,true)); // for example
pausableTimer(10000,isActive$).pipe(
repeat(),switchMap(() => of('polling time'))
).subscribe();
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。