如何解决React Actions 使用 RxJS 进行并行异步调用
我有一个现有操作,该操作使用 ID 列表作为查询字符串参数调用 api,并获取传递的 ID 的缩略图响应。在某些情况下,ID 的数量可以是 150-200 条记录。因此,我正在使用 RxJs 的 forkJoin
和 subscribe
方法使此 API 调用基于批处理并并行运行。我在实施方面面临以下 3 个问题。
-
maxParallelQueries 似乎没有按预期工作。所有的 fetchThumbnailObservables() 都是一次性执行的,而不是设置为常量的 3 个批次/
-
如何处理 observable 的返回,以便 mergeMap 不给出语法错误。
Argument of type '({ payload }: IAction) => void' is not assignable to parameter of type '(value: IAction,index: number) => ObservableInput<any>'. Type 'void' is not assignable to type 'ObservableInput<any>'.ts(2345)
这是我当前的代码。任何帮助都会很好。
const getThumbnails: Epic<IAction,IAction,IStoreState> = (action$,state$) =>
action$.ofType(ActionTypes.AssetActions.DATA.FETCH_THUMBNAILS).pipe(
mergeMap( ({ payload }) => {
const assetIds = Object.keys(payload);
const Meta = payload;
const batchSize = 10;
const maxParallelQueries = 3;
const fetchThumbnailObservables : Observable<IThumbnailResponse>[] = [];
for (let count = 0; count < assetIds.length; count += batchSize) {
fetchThumbnailObservables.push(AssetDataService.fetchThumbnailData(assetIds.slice(count,count + batchSize)));
}
forkJoin(fetchThumbnailObservables)
.pipe(mergeMap(fetchThumbnailObservable => fetchThumbnailObservable,maxParallelQueries)) // Issue 1 : maxParallelQueries limit is not working
.subscribe({
complete: () => { },error: () => { },next: (response) => {
// Issue 2 : below action dont seems to be getting invoked
actions.AssetActions.receivedThumbnailsAction(response,Meta)
},});
// Issue 3 : How to handle return of observable
// Added below code for prevennting error raised by mergeMap( ({ payload }) => {
return new Observable<any>();
}),catchError((error) => of(actions.Common.errorOccured({ error })))
);
```
解决方法
我不知道 Epics,但我想我可以给你一些关于如何解决这个问题的提示。
让我们从这样一个事实开始:您有一个 id 数组 assetIds
,并且您希望以受控的并发级别 maxParallelQueries
为每个 id 进行 http 调用。
在这种情况下,from
函数和 mergeMap
运算符是您的朋友。
你需要做的是
// from function transform an array of values,in this case,to a stream of values
from(assetIds).pipe(
mergeMap(id => {
// do whatever you need to do and eventually return an Observable
return AssetDataService.fetchThumbnailData(id)
},maxParallelQueries) // the second parameter of mergeMap is the concurrency
)
通过这种方式,您应该能够调用 http 调用并保持您已建立的并发限制
,- 我能够在下一个订阅方法中获得基于批处理的响应。但是,在此函数中调用的操作以及响应并未被调用。
你不能在 redux-observable 中强制调用一个动作。相反,您需要对它们进行排队。
actions.AssetActions.receivedThumbnailsAction(response,meta)
将返回形状为 {type: "MY_ACTION",payload: ...}
的普通对象,而不是分派动作。您宁愿返回包装在 mergeMap
内的 observable 中的对象,以便将下一个操作添加到队列中。 (见下面的解决方案)
- maxParallelQueries 似乎没有按预期工作。所有 fetchThumbnailObservables() 都立即执行,而不是按设置为常量的 3 个批次/
第一猜测:AssetDataService.fetchThumbnailData(assetIds.slice(count,count + batchSize))
的返回类型是什么?如果它是 Promise
类型,那么 maxParallelQueries
无效,因为承诺是急切的并且在创建时立即开始,因此在 mergeMap
有机会控制请求之前。因此,请确保 fetchThumbnailData
返回一个可观察对象。如果您需要将 Promise 转换为 Observable 以确保 Promise 不会在 defer
内过早地触发,请使用 from
而不是 fetchThumbnailData
。
第二个猜测:forkJoin
正在触发所有请求,因此 mergeMap
甚至没有机会限制并行查询的数量。我很惊讶打字稿没有在这里警告您 mergeMap(fetchThumbnailObservable => fetchThumbnailObservable,...)
中的返回类型不正确。我下面的解决方案将 forJoin
替换为 from
,并且应该启用您的 maxParallelQueries
。
- 如何处理 observable 的返回,以便 mergeMap 不给出语法错误。 '({ payload }: IAction) => void' 类型的参数不可分配给类型 '(value: IAction,index: number) => ObservableInput' 的参数。类型 'void' 不可分配给类型 'ObservableInput'.ts(2345)
这不是语法错误。在史诗中,您可以选择返回一个空的 observable return empty()
,如果没有后续动作,或者通过返回 IAction
类型的 observable 来分派另一个动作。在这里,我(未经测试)尝试满足您的要求:
const getThumbnails: Epic<IAction,IAction,IStoreState> = (action$,state$) =>
action$.ofType(ActionTypes.AssetActions.DATA.FETCH_THUMBNAILS).pipe(
mergeMap( ({ payload }) => {
const assetIds = Object.keys(payload);
const meta = payload;
const batchSize = 10;
const maxParallelQueries = 3;
const fetchThumbnailObservables : Observable<IThumbnailResponse>[] = [];
for (let count = 0; count < assetIds.length; count += batchSize) {
fetchThumbnailObservables.push(
AssetDataService.fetchThumbnailData(
assetIds.slice(count,count + batchSize)
)
);
}
return from(fetchThumbnailObservables)
.pipe(
mergeMap(
fetchThumbnailObservable => fetchThumbnailObservable,maxParallelQueries
),map((response) =>
// dispatch action for each response
actions.AssetActions.receivedThumbnailsAction(response,meta)
),// catch error early so epic continue on error
catchError((error) => of(actions.Common.errorOccured({ error })))
);
})
);
请注意,我将 catchError
移到更靠近请求的位置(在内部 forkJoin Observable 内),以便在发生错误时史诗不会终止。 Error Handling - redux observable 之所以如此,是因为当 catchError
处于活动状态时,它会用 catchError
返回的可观察对象替换失败的可观察对象。而且我假设您不想在出现错误时替换整个史诗?
还有一件事:redux-observable 为您处理订阅,因此无需在您的应用程序中的任何位置调用 .subscribe
或 .unsubscribe
。你只需要关心 observables 的生命周期。这里最重要的是要知道一个 observable 什么时候开始,当它完成时会发生什么,它什么时候完成或者当一个 observable 抛出时会发生什么。
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。