如何解决RXJS Observables - 每 500 毫秒运行一次重复函数并将结果输出到可观察流
首先 - 我是 Observables 的新手。来自 Python。
我正在努力实现:
- 每 500 毫秒调用一个函数以从图表中返回形状
- 形状以数组形式返回
- 我只想要一个过滤的子集“horizontal_lines”
- 通过 observable 输出“horizontal_lines”
下面的代码是功能性的 - 除了不重复。我无法计算 RXJS 6 的计时器、延迟或间隔。
// In this example the chart has 3 horizontal lines at 5,10 and 17 "prices"
// Desired output is 5 -- 10 -- 17
// When 4th line is added at 20 "price" expected output would be
// 5 -- 10 -- 17 -- 20
const test$ = new Observable<number>(observer => {
chart
.activeChart()
.getAllShapes()
.forEach((shape) => {
if (shape.name == 'horizontal_line') {
chart
.activeChart()
.getShapeById(shape.id)
.getPoints()
.forEach((point) => {
observer.next(+point.price.toFixed(4));
});
}
})
}).pipe(
delay(500),repeat()
解决方法
一种选择是使用 setTimeout 函数来重复你的代码而不是一个可观察的。
这是一个这样做的例子。
export class SomeComponent implements OnInit,OnDestroy {
Alive = true
ngOnInit(){
this.repeating_function()
}
ngOnDestroy(){
this.Alive = false
}
repeating_function(){
// some code
//repeat the function every 500 ms while the component is active
if (this.Alive){
setTimeout(() => {this.repeating_function()},500);
}
}
}
,
除非您的 observable 完成,否则 repeat
将不起作用:
const test$ = new Observable<number>(observer => {
chart
.activeChart()
.getAllShapes()
.forEach((shape) => {
if (shape.name == 'horizontal_line') {
chart
.activeChart()
.getShapeById(shape.id)
.getPoints()
.forEach((point) => {
observer.next(+point.price.toFixed(4));
});
}
})
observer.complete(); // <-- Complete observable after all emissions
}).pipe(
delay(500),repeat()
);
,
每 500 毫秒运行一次重复函数并将结果输出到可观察流
我认为最简单的方法是使用 interval
( 或 timer
,如果您需要立即发射)每 500 毫秒创建一个发射流,然后只需将发射映射到函数的返回值:
test$ = interval(500).pipe(
map(() => theFunction())
);
这是这个简单示例的有效 StackBlitz。显然,你可以让函数做任何你需要做的事情:-)
,我不确定到目前为止您使用 RxJS timer
和 interval
尝试了什么。您可以使用 timer
并使用 map
运算符映射到所需的数据。然后,您可以使用 Array#map
和 Array#filter
根据要求转换数据。
timer(0,500).pipe( // <-- emit immediately and after every 500ms thereafter
map(_ =>
chart
.activeChart()
.getAllShapes()
.map(shape => {
if (shape.name === 'horizontal_line')
return shape.getPoints()
})
.filter((points) => !!points) // <-- filter `undefined` from previous step
)
).subscribe({
next: points => {
/* output:
[
[ <collection of points from line 5> ],[ <collection of points from line 10> ],[ <collection of points from line 17> ],...
]
*/
}
});
如果出于某种原因,您希望将所有点组合在一个数组中并将它们连续发送到订阅中,则需要使用 swithcMap
运算符和 RxJS from
函数。
timer(0,500).pipe(
map(_ =>
chart
.activeChart()
.getAllShapes()
.map(shape => {
if (shape.name === 'horizontal_line')
return shape.getPoints()
})
.filter((points) => !!points) // <-- filter `undefined` from previous step
),switchMap(pointsColl => {
return from(pointsColl.flat())
})
).subscribe({
next: points => {
/* output:
point 1 from line 5
point 2 from line 5
...
*/
}
});
如果您需要将所有点的集合作为单个数组发出,则需要使用 of
函数而不是 from
。
// <repeat from above>
switchMap(pointsColl => {
return of(pointsColl.flat())
})
).subscribe({
next: points => {
/* output:
[ <collection of points from line 5,10 and 17> ]
*/
}
});
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。