如何解决RxJS 过滤/分组去抖动 操作员使用 partitionOn 进行优先去抖动priorityDebounceTime 运算符使用 priorityDebounceTime 进行优先去抖动
我正在尝试使用 RxJS 去抖动操作符,但我想自定义源的发射何时去抖动。
默认情况下,来自去抖动窗口内源的任何发射将导致之前的发射被丢弃。根据源发射的值,我希望仅某些来自源的排放计入去抖动操作。
{
priority: 'low' //can be 'low' or 'medium' or 'high
}
我希望去抖动按对象的优先级分组。这意味着只有当一个发射具有相同的优先级时,它才会被另一个发射去抖动。
即只有 'low'
排放可以消除 'low'
排放,并且只有 'high'
排放可以消除 'high'
排放。如果 'medium'
发射在 'low'
发射等待时到来,它不会导致 'low'
发射被丢弃。
这意味着如果我快速连续发射 'low'
和 'medium'
,两者都会通过。如果我快速连续发射两次 'low'
,那么只有最后一次会通过。
这是我想出的:
const $source = // some observable of type { priority: 'low' | 'medium' | 'high' }
const delay = 1000
$source.pipe(
mergeMap(value => {
// We start a race of the value with a delay versus any other emissions from the source with the same priority
return race(
timer(delay).pipe(mapTo(value)),$source.pipe(
filter(v => v.priority === value.priority),)
).pipe(
take(1),// If another emission with the same priority comes before the delay,the second racer it will win the race.
// If no emission with the same priority comes,the first racer will win.
//
// If the first racer wins,this equality check is satisfied and the value is passed through.
// If the second racer wins,the equality check fails and no value is emitted. Since this is a mergeMap,this whole process will start again for that emission.
filter(v => v === value),)
})
)
我认为以上是正确的,但我想知道我是否遗漏了什么或使这种方式比需要的更复杂?上面的代码应该像合并 $low.pipe(debounceTime(delay))
$medium.pipe(debounceTime(delay))
和 $high.pipe(debounceTime(delay))
的三个独立流一样工作。
谢谢!!
解决方法
我认为您的回答有效。这也很清楚。但是,您必须确保您的 $source
是多播的。
我认为您的方法有一个缺点:
你做了很多额外的计算。如果您每秒对 1000 个值进行去抖动,则根据运行位置的不同,它可能会明显变慢。
每个流值可以在任意数量的比赛中。来自不同优先级的输入仍然相互竞争,当下一个值开始竞争时,前一个竞争不会停止,因此如果一次到达很多值,您可能会遇到计时器/竞争的爆炸式增长。
设置和删除了很多额外的计时器。在您的情况下,您应该最多需要三个计时器,每个计时器都会在相同优先级的新值到达时重置。
如果您的代码不在关键路径上,那可能不是问题。否则,还有其他方法。不过,我想到的那个在代码方面有点笨重。
对流进行分区
这是我的大脑如何解决这个问题的。我创建了一个操作符,它执行 RxJS partition
操作符的功能,但允许您划分为两个以上的流。
我的方法在内部处理多播,因此源可以是任何内容(热、冷、多播或非多播)。它(内部)为每个流设置一个主题,然后您可以像往常一样使用 RxJS 的 debounceTime。
不过有一个缺点。在您的方法中,您可以随意添加一个新的优先级字符串,它应该会继续工作。 {priority: "DucksSayQuack"} 的对象将相互去抖动而不影响其他优先级。这甚至可以即时完成。
下面的 partitionOn
操作符需要提前知道分区。对于您所描述的情况,它应该具有相同的输出并且启动效率更高。
这样更好吗?我不知道,这是解决同一问题的有趣且不同的方法。此外,我认为 partitionOn
运算符的用途比分区去抖动更多。
操作员
/***
* Create a partitioned stream for each value where a passed
* predicate returns true
***/
function partitionOn<T>(
input$: Observable<T>,predicates: ((v:T) => boolean)[]
): Observable<T>[] {
const partitions = predicates.map(predicate => ({
predicate,stream: new Subject<T>()
}));
input$.subscribe({
next: (v:T) => partitions.forEach(prt => {
if(prt.predicate(v)){
prt.stream.next(v);
}
}),complete: () => partitions.forEach(prt => prt.stream.complete()),error: err => partitions.forEach(prt => prt.stream.error(err))
});
return partitions.map(prt => prt.stream.asObservable());
}
使用 partitionOn 进行优先去抖动
const $source = // some observable of type { priority: 'low' | 'medium' | 'high' }
const delay = 1000;
const priorityEquals = a => b => a === b?.priority;
merge(
...partitionOn(
$source,[priorityEquals('low'),priorityEquals('medium'),priorityEquals('high')]
).map(s => s.pipe(
debounceTime(1000)
))
);
为您的流添加时间戳
此方法与您的方法非常相似,可让您再次使用优先级字符串。这有一个类似的问题,即每个值都被扔进一个计时器,并且计时器不会在新值到达时取消。
然而,通过这种方法,取消不必要的计时器的途径更加清晰。您可以在 priorityTimeStamp
映射中将订阅对象与时间戳一起存储,并确保在新值到达时取消订阅。
我真的不知道这会对性能造成什么影响,我认为 JavaScript 的事件循环非常健壮/高效。这种方法的好处是您无需支付多播成本。这实际上只是一个流,使用查找图来决定哪些内容被过滤,哪些内容没有。
priorityDebounceTime 运算符
function priorityDebounceTime<T>(
dbTime: number,priorityStr = "priority"
): MonoTypeOperatorFunction<T> {
return s => defer(() => {
const priorityTimeStamp = new Map<string,number>();
return s.pipe(
mergeMap(v => {
priorityTimeStamp.set(v[priorityStr],Date.now());
return timer(dbTime).pipe(
timestamp(),filter(({timestamp}) =>
timestamp - priorityTimeStamp.get(v[priorityStr]) >= dbTime
),mapTo(v)
)
})
)
});
}
使用 priorityDebounceTime 进行优先去抖动
这显然要简单一些:
const $source = // some observable of type { priority: 'low' | 'medium' | 'high' }
const delay = 5000;
$source.pipe(
priorityDebounceTime(delay)
).subscribe(console.log);
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。