如何解决RxJs - 如何使 observable 表现得像队列 异步事务旁白:为什么要使用 defer?
我正在努力实现下一个目标:
private beginTransaction(): Observable() {
..
}
private test(): void {
this.beginTransaction().subscribe((): void => {
this.commitTransaction();
});
this.beginTransaction().subscribe((): void => {
this.commitTransaction();
});
}
beginTransaction 可以被并发调用,但应该延迟 observable 直到第一个或只有一个 beginTransaction 完成。
换句话说:任何时候只能进行一笔交易。
我尝试了什么:
private transactionInProgress: boolean = false;
private canBeginTransaction: Subject<void> = new Subject<void>();
private bla3(): void {
this.beginTransaction().subscribe((): void => {
console.log('beginTransaction 1');
this.commitTransaction();
});
this.beginTransaction().subscribe((): void => {
console.log('beginTransaction 2');
this.commitTransaction();
});
this.beginTransaction().subscribe((): void => {
console.log('beginTransaction 3');
this.commitTransaction();
});
}
private commitTransaction(): void {
this.transactionInProgress = false;
this.canBeginTransaction.next();
}
private beginTransaction(): Observable<void> {
if(this.transactionInProgress) {
return of(undefined)
.pipe(
skipUntil(this.canBeginTransaction),tap((): void => {
console.log('begin transaction');
})
);
}
this.transactionInProgress = true;
return of(undefined);
}
解决方法
您所问的问题非常模糊和笼统。毫无疑问,更受限制的场景可能看起来要简单得多。
无论如何,我在这里创建了一个管道,每次只允许订阅一次 transaction(): Observable
。
可能的样子:
/****
* Represents what each transaction does. Isn't concerned about
* order/timing/'transactionInProgress' or anything like that.
*
* Here is a fake transaction that just takes 3-5 seconds to emit
* the string: `Hello ${name}`
****/
function transaction(args): Observable<string> {
const name = args?.message;
const duration = 3000 + (Math.random() * 2000);
return of("Hello").pipe(
tap(_ => console.log("starting transaction")),switchMap(v => timer(duration).pipe(
map(_ => `${v} ${name}`)
)),tap(_ => console.log("Ending transation"))
);
}
// Track transactions
let currentTransactionId = 0;
// Start transactions
const transactionSubj = new Subject<any>();
// Perform transaction: concatMap ensures we only start a new one if
// there isn't a current transaction underway
const transaction$ = transactionSubj.pipe(
concatMap(({id,args}) => transaction(args).pipe(
map(payload => ({id,payload}))
)),shareReplay(1)
);
/****
* Begin a new transaction,we give it an ID since transactions are
* "hot" and we don't want to return the wrong (earlier) transactions,* just the current one started with this call.
****/
function beginTransaction(args): Observable<any> {
return defer(() => {
const currentId = currentTransactionId++;
transactionSubj.next({id: currentId,args});
return transaction$.pipe(
first(({id}) => id === currentId),map(({payload}) => payload)
);
})
}
// Queue up 3 transactions,each one will wait for the previous
// one to complete before it will begin.
beginTransaction({message: "Dave"}).subscribe(console.log);
beginTransaction({message: "Tom"}).subscribe(console.log);
beginTransaction({message: "Tim"}).subscribe(console.log);
异步事务
当前设置要求事务是异步的,否则您可能会丢失第一个事务。解决这个问题的方法并不简单,所以我构建了一个订阅的操作符,然后尽快调用一个函数。
这是:
function initialize<T>(fn: () => void): MonoTypeOperatorFunction<T> {
return s => new Observable(observer => {
const bindOn = name => observer[name].bind(observer);
const sub = s.subscribe({
next: bindOn("next"),error: bindOn("error"),complete: bindOn("complete")
});
fn();
return {
unsubscribe: () => sub.unsubscribe
};
});
}
这里正在使用:
function beginTransaction(args): Observable<any> {
return defer(() => {
const currentId = currentTransactionId++;
return transaction$.pipe(
initialize(() => transactionSubj.next({id: currentId,args})),first(({id}) => id === currentId),map(({payload}) => payload)
);
})
}
旁白:为什么要使用 defer
?
考虑重写 beginTransaction:
function beginTransaction(args): Observable<any> {
const currentId = currentTransactionId++;
return transaction$.pipe(
initialize(() => transactionSubj.next({id: currentId,map(({payload}) => payload)
);
}
在这种情况下,ID 在您调用 beginTransaction
时设置。
// The ID is set here,but it won't be used until subscribed
const preppedTransaction = beginTransaction({message: "Dave"});
// 10 seconds later,that ID gets used.
setTimeout(
() => preppedTransaction.subscribe(console.log),10000
);
如果在没有初始化运算符的情况下调用 transactionSubj.next
,那么这个问题会变得更糟,因为 transactionSubj.next
也会在订阅 observable 前 10 秒被调用(你肯定会错过输出)
问题仍然存在:
如果你想订阅同一个 observable 两次怎么办?
const preppedTransaction = beginTransaction({message: "Dave"});
preppedTransaction.subscribe(
value => console.log("First Subscribe: ",value)
);
preppedTransaction.subscribe(
value => console.log("Second Subscribe: ",value)
);
我希望输出为:
First Subscribe: Hello Dave
Second Subscribe: Hello Dave
相反,你得到
First Subscribe: Hello Dave
First Subscribe: Hello Dave
Second Subscribe: Hello Dave
Second Subscribe: Hello Dave
因为您在订阅时没有获得新 ID,所以两个订阅共享一个 ID。 defer
通过在订阅前不分配 ID 来解决此问题。这在管理流中的错误时变得非常重要(让您在 observable 出错后重新尝试)。
我不确定我是否正确理解了问题,但在我看来,concatMap
是您正在寻找的运算符。
示例如下
const transactionTriggers$ = from([
't1','t2','t3'
])
function processTransation(trigger: string) {
console.log(`Start processing transation triggered by ${trigger}`)
// do whatever needs to be done and then return an Observable
console.log(`Transation triggered by ${trigger} processing ......`)
return of(`Transation triggered by ${trigger} processed`)
}
transactionTriggers$.pipe(
concatMap(trigger => processTransation(trigger)),tap(console.log)
).subscribe()
您基本上从一个事件流开始,其中每个事件都应该触发交易的处理。
然后您使用 processTransaction
函数执行处理交易所需的任何操作。 processTransactio
需要返回一个 Observable,该 Observable 在事务处理完毕然后完成时发出处理结果。
然后在管道中,如果需要,您可以使用 tap
对处理结果做进一步的处理。
您可以尝试this stackblitz中的代码。
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。