如何解决使用 rxjs 在 redux-observable 史诗中具有有限并发性的并行 HTTP 请求
一直试图解决这个问题。 目前我有一个非常大的对象数组(我称之为瓷砖)。 我有一个 API 端点,我应该一个一个地发送这些对象,这个 API 什么都不返回,只返回状态。 我需要以并行和并发的方式将此对象发送到端点,当它们中的最后一个成功时,我应该发出一些字符串值,该值会进入 redux 存储。
const tilesEpic =(action$,_state$) => {
action$.pipe(
ofType('TILE_ACTION'),map(tilesArray => postTilesConcurrently(tilesArray),map(someId => someReduxAction(someId),)
const postTilesConcurrently = (tilesArray) => {
const tilesToObservables = tilesArray.map(tile => defer(() => postTile(tile))
return from(tiletoObservables).pipe(mergeAll(concurrencyLimit))
}
问题是我不知道如何从 someId
发出 postTilesConcurrently
,现在它在每个请求完成后触发操作。
解决方法
mergeAll()
将并行订阅所有源,但它也会立即发出每个结果。因此,您可以使用例如 forkJoin()
(
您也可以使用 toArray()
运算符)。
forkJoin(tilesToObservables)
.pipe(
map(results => results???),// Get `someId` somehow from results
);
forkJoin()
将在所有源 Observable 至少发出一次并完成后发出一次。这意味着对于每个源 Observable,你只会得到它发出的最后一个值。
在 Martin 的回复之后,我调整了我的代码以使用 forkJoin
const tilesEpic =(action$,_state$) => {
action$.pipe(
ofType('TILE_ACTION'),concatMap(tilesArray => postTilesConcurrently(tilesArray),map(({someId}) => someReduxAction(someId),)
const postTilesConcurrently = (tilesArray) => {
const tilesToObservables = tilesArray.map(tile => defer(() => postTile(tile))
return forkJoin({
images: from(tileToObservables).pipe(mergeAll(concurrencyLimit)),someId: from([someId]),}
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。