微信公众号搜"智元新知"关注
微信扫一扫可直接关注哦!

使用 rxjs 在 redux-observable 史诗中具有有限并发性的并行 HTTP 请求

如何解决使用 rxjs 在 redux-observable 史诗中具有有限并发性的并行 HTTP 请求

一直试图解决这个问题。 目前我有一个非常大的对象数组(我称之为瓷砖)。 我有一个 API 端点,我应该一个一个地发送这些对象,这个 API 什么都不返回,只返回状态。 我需要以并行和并发的方式将此对象发送到端点,当它们中的最后一个成功时,我应该发出一些字符串值,该值会进入 redux 存储。

const tilesEpic =(action$,_state$) => {
    action$.pipe(
        ofType('TILE_ACTION'),map(tilesArray => postTilesConcurrently(tilesArray),map(someId => someReduxAction(someId),)

const postTilesConcurrently = (tilesArray) => {
     const tilesToObservables = tilesArray.map(tile => defer(() => postTile(tile))
     return from(tiletoObservables).pipe(mergeAll(concurrencyLimit))
     }

问题是我不知道如何从 someId 发出 postTilesConcurrently,现在它在每个请求完成后触发操作。

解决方法

mergeAll() 将并行订阅所有源,但它也会立即发出每个结果。因此,您可以使用例如 forkJoin() ( 您也可以使用 toArray() 运算符)。

forkJoin(tilesToObservables)
  .pipe(
    map(results => results???),// Get `someId` somehow from results
  );

forkJoin() 将在所有源 Observable 至少发出一次并完成后发出一次。这意味着对于每个源 Observable,你只会得到它发出的最后一个值。

,

在 Martin 的回复之后,我调整了我的代码以使用 forkJoin

const tilesEpic =(action$,_state$) => {
    action$.pipe(
        ofType('TILE_ACTION'),concatMap(tilesArray => postTilesConcurrently(tilesArray),map(({someId}) => someReduxAction(someId),)

const postTilesConcurrently = (tilesArray) => {
     const tilesToObservables = tilesArray.map(tile => defer(() => postTile(tile))
     return forkJoin({
      images: from(tileToObservables).pipe(mergeAll(concurrencyLimit)),someId: from([someId]),}

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。