在 rxJs 中挣扎于 flatMap 与 concatMap

如何解决在 rxJs 中挣扎于 flatMap 与 concatMap

我很难理解 rxJs 中 flatMapconcatMap 之间的区别。

我能理解的最明确的答案是这里difference-between-concatmap-and-flatmap

所以我自己去尝试了。

import "./styles.css";
import { switchMap,flatMap,concatMap } from "rxjs/operators";
import { fromFetch } from "rxjs/fetch";
import { Observable } from "rxjs";

function createObs1() {
  return new Observable<number>((subscriber) => {
    setTimeout(() => {
      subscriber.next(1);
      subscriber.complete();
    },900);
  });
}

function createObs2() {
  return new Observable<number>((subscriber) => {
    setTimeout(() => {
      subscriber.next(2);
      //subscriber.next(22);
      //subscriber.next(222);
      subscriber.complete();
    },800);
  });
}

function createObs3() {
  return new Observable<number>((subscriber) => {
    setTimeout(() => {
      subscriber.next(3);
      //subscriber.next(33);
      //subscriber.next(333);
      subscriber.complete();
    },700);
  });
}

function createObs4() {
  return new Observable<number>((subscriber) => {
    setTimeout(() => {
      subscriber.next(4);
      subscriber.complete();
    },600);
  });
}

function createObs5() {
  return new Observable<number>((subscriber) => {
    setTimeout(() => {
      subscriber.next(5);
      subscriber.complete();
    },500);
  });
}

createObs1()
  .pipe(
    flatMap((resp) => {
      console.log(resp);
      return createObs2();
    }),flatMap((resp) => {
      console.log(resp);
      return createObs3();
    }),flatMap((resp) => {
      console.log(resp);
      return createObs4();
    }),flatMap((resp) => {
      console.log(resp);
      return createObs5();
    })
  )
  .subscribe((resp) => console.log(resp));

console.log("hellooo");

我在这里用过那个游乐场playground example

问题

1) 根据我的理解,flatMap 的使用应该混合输出,以便控制台日志类似于 (1,3,2,4,5)。我已经尝试了 30 多次并且总是出现在同一行 (1,5)

我做错了什么或有什么不可告人的错误?

2) 如果在 createObs2()createObs3() 上删除注释并将代码包含在多个发出的事件中,那么事情就会变得一团糟。即使您更改为 concatMap 它也会使事情变得混乱并且结果喜忧参半。我期望只出现一次的多个数字多次出现。结果可以是 (1,33,22,5,5) 为什么会这样?

我如何在操场上测试示例。我只是从最后一个 console.log("hello") 中删除了 1 个字母。只有一个更改,例如 console.log("heloo") 然后观察到并再次编译项目并在控制台中打印输出。

编辑:我使用 flatMap 和 concatMap 的原因是为了使用 http 库在 angular 中找到嵌套订阅的替代品。

createObs1().subscribe( (resp1) => {
          console.log(resp1);
          createObs2().subscribe( (resp2) => {
               console.log(resp2);
               createObs3().subscribe( (resp3) => {
                   console.log(resp3);
                   createObs4().subscribe( (resp4) => {
                        console.log(resp4);
                        createObs5().subscribe( (resp5) => {
                            console.log(resp5);
                            })
                        })
                   })
               })
             })

解决方法

您的测试场景并不足以看出这两个运算符之间的差异。在您的测试用例中,每个 observable 只发出 1 次。如果 observable 只发出一个值,那么 concatMapflatMap又名 mergeMap)之间真的没有什么不同。只有在多次排放时才能看到差异。

所以,让我们使用不同的场景。让我们有一个 source$ observable,它只是每 1 秒发出一个递增的整数。然后,在我们的“Higher Order Mapping Operator”(concatMap & mergeMap)中,我们将返回一个每 1 秒发出可变次数的 observable,然后完成。

// emit number every second
const source$ = interval(1000).pipe(map(n => n+1)); 

// helper to return observable that emits the provided number of times
function inner$(max: number,description: string): Observable<string> {
  return interval(1000).pipe(
    map(n => `[${description}: inner source ${max}] ${n+1}/${max}`),take(max),);
}

然后让我们基于 source$inner$ 定义两个单独的 observable;一种使用 concatMap,一种使用 flatMap 并观察输出。

const flatMap$ = source$.pipe(
    flatMap(n => inner$(n,'flatMap$'))
);

const concatMap$ = source$.pipe(
    concatMap(n => inner$(n,'concatMap$'))
);

在查看输出的差异之前,让我们先谈谈这些运算符的共同点。他们俩:

  • 订阅传入函数返回的observable
  • 从这个“内部可观察对象”中发出排放
  • 取消订阅内部 observable

不同之处在于它们如何创建和管理内部订阅:

concatMap - 一次只允许一个内部订阅。当它接收到排放时,它一次只会订阅一个内部可观察对象。所以它最初会订阅“emission 1”创建的observable,只有在它完成后才会订阅“emission 2”创建的observable。这与 concat 静态方法的行为方式一致。

flatMap (aka mergeMap) - 允许许多内部订阅。因此,当接收到新的排放时,它将订阅内部可观察量。这意味着排放不会按任何特定顺序进行,因为它会在其任何内部可观察对象发出时发出。这与 merge 静态方法的行为方式一致(这就是我个人更喜欢名称“mergeMap”的原因)。

这是一个 StackBlitz,它显示了上述 observables concatMap$mergeMap$ 的输出: concatMap vs mergeMap

希望以上解释能帮助您解决问题!

#1 - “使用 flatMap 应该混合输出

这没有像您预期的那样工作的原因是因为只有一个发射通过 flatMap,这意味着您只有一个“内部可观察”发射值。如上例所示,一旦 flatMap 接收到多个发射,它就可以拥有多个独立发射的内部 observable。

#2 - “...并包含带有多个发出事件的代码,然后事情就会变得一团糟。

“事情变得一团糟”是因为有多个发出值的内部订阅。

对于您提到的关于使用 concatMap 并且仍然获得“混合”输出的部分,我不希望这样。当启用“自动保存”时,我在 StackBlitz 中看到了可观察到的异常行为(似乎有时它不会完全刷新,并且旧订阅似乎在自动刷新后仍然存在,这会导致控制台输出非常混乱 )。也许代码沙箱也有类似的问题。

#3 - “我使用 flatMap 和 concatMap 的原因是为了使用 http 库找到嵌套订阅的替代品

这是有道理的。您不想弄乱嵌套订阅,因为没有一个很好的方法来保证内部订阅会被清理。

在大多数使用 http 调用的情况下,我发现 switchMap 是理想的选择,因为它会减少您不再关心的内部 observable 的排放。假设您有一个从路由参数中读取 id 的组件。它使用此 id 进行 http 调用以获取数据。

itemId$ = this.activeRoute.params.pipe(
    map(params => params['id']),distinctUntilChanged()
);

item$ = this.itemId$.pipe(
    switchMap(id => http.get(`${serverUrl}/items/${id}`)),map(response => response.data)
);

我们希望 item$ 仅发出“当前项目”(对应于 url 中的 id)。假设我们的 UI 有一个按钮,用户可以点击以通过 id 导航到下一个项目,并且您的应用发现自己有一个喜欢点击的用户,他一直在粉碎该按钮,这比 http 调用更快地更改了 url 参数可以返回数据。

如果我们选择 mergeMap,我们最终会得到许多内部 observable,它们会发出所有这些 http 调用的结果。充其量,屏幕会随着所有这些不同的呼叫返回而闪烁。在最坏的情况下(如果调用出现乱序),UI 将显示与 url 中的 id 不同步的数据:-(

如果我们选择 concatMap,用户将被迫等待所有的 http 调用串行完成,即使我们只关心最近的那个。

但是,对于 switchMap,每当接收到新的发射 (itemId) 时,它将取消订阅前一个内部可观察对象并订阅新的。这意味着它永远不会发出不再相关的旧 http 调用的结果。 :-)

需要注意的一点是,由于 http observables 只发出一次,各种操作符(switchMapmergeMapconcatMap)之间的选择似乎没有什么区别,因为它们都为我们执行“内部可观察处理”。但是,最好让您的代码面向未来,并选择真正为您提供所需行为的代码,如果您开始接收多个发射。

,

每次第一个 observable 发出时,都会在 flatMap 中创建第二个 observable 并开始发出。但是,第一个 observable 的值不会再继续传递。

每次第二个 observable 发出时,下一个 flatMap 都会创建第三个 observable,依此类推。同样,进入 flatMap 的原始值不会进一步传递。

createObs1()
  .pipe(
    flatMap(() => createObs2()),// Merge this stream every time prev observable emits
    flatMap(() => createObs3()),// Merge this stream every time prev observable emits
    flatMap(() => createObs4()),// Merge this stream every time prev observable emits
    flatMap(() => createObs5()),// Merge this stream every time prev observable emits
  )
  .subscribe((resp) => console.log(resp));

// OUTPUT:
// 5

因此,只有从 createObs5() 发出的值才真正发送给观察者。先前 observable 发出的值刚刚触发了新 observable 的创建。

如果您使用 merge,那么您会得到您所期望的:

createObs1()
  .pipe(
    merge(createObs2()),merge(createObs3()),merge(createObs4()),merge(createObs5()),)
  .subscribe((resp) => console.log(resp));

// OUTPUT:
// 5
// 4
// 3
// 2
// 1

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。

相关推荐


使用本地python环境可以成功执行 import pandas as pd import matplotlib.pyplot as plt # 设置字体 plt.rcParams[&#39;font.sans-serif&#39;] = [&#39;SimHei&#39;] # 能正确显示负号 p
错误1:Request method ‘DELETE‘ not supported 错误还原:controller层有一个接口,访问该接口时报错:Request method ‘DELETE‘ not supported 错误原因:没有接收到前端传入的参数,修改为如下 参考 错误2:cannot r
错误1:启动docker镜像时报错:Error response from daemon: driver failed programming external connectivity on endpoint quirky_allen 解决方法:重启docker -&gt; systemctl r
错误1:private field ‘xxx‘ is never assigned 按Altʾnter快捷键,选择第2项 参考:https://blog.csdn.net/shi_hong_fei_hei/article/details/88814070 错误2:启动时报错,不能找到主启动类 #
报错如下,通过源不能下载,最后警告pip需升级版本 Requirement already satisfied: pip in c:\users\ychen\appdata\local\programs\python\python310\lib\site-packages (22.0.4) Coll
错误1:maven打包报错 错误还原:使用maven打包项目时报错如下 [ERROR] Failed to execute goal org.apache.maven.plugins:maven-resources-plugin:3.2.0:resources (default-resources)
错误1:服务调用时报错 服务消费者模块assess通过openFeign调用服务提供者模块hires 如下为服务提供者模块hires的控制层接口 @RestController @RequestMapping(&quot;/hires&quot;) public class FeignControl
错误1:运行项目后报如下错误 解决方案 报错2:Failed to execute goal org.apache.maven.plugins:maven-compiler-plugin:3.8.1:compile (default-compile) on project sb 解决方案:在pom.
参考 错误原因 过滤器或拦截器在生效时,redisTemplate还没有注入 解决方案:在注入容器时就生效 @Component //项目运行时就注入Spring容器 public class RedisBean { @Resource private RedisTemplate&lt;String
使用vite构建项目报错 C:\Users\ychen\work&gt;npm init @vitejs/app @vitejs/create-app is deprecated, use npm init vite instead C:\Users\ychen\AppData\Local\npm-
参考1 参考2 解决方案 # 点击安装源 协议选择 http:// 路径填写 mirrors.aliyun.com/centos/8.3.2011/BaseOS/x86_64/os URL类型 软件库URL 其他路径 # 版本 7 mirrors.aliyun.com/centos/7/os/x86
报错1 [root@slave1 data_mocker]# kafka-console-consumer.sh --bootstrap-server slave1:9092 --topic topic_db [2023-12-19 18:31:12,770] WARN [Consumer clie
错误1 # 重写数据 hive (edu)&gt; insert overwrite table dwd_trade_cart_add_inc &gt; select data.id, &gt; data.user_id, &gt; data.course_id, &gt; date_format(
错误1 hive (edu)&gt; insert into huanhuan values(1,&#39;haoge&#39;); Query ID = root_20240110071417_fe1517ad-3607-41f4-bdcf-d00b98ac443e Total jobs = 1
报错1:执行到如下就不执行了,没有显示Successfully registered new MBean. [root@slave1 bin]# /usr/local/software/flume-1.9.0/bin/flume-ng agent -n a1 -c /usr/local/softwa
虚拟及没有启动任何服务器查看jps会显示jps,如果没有显示任何东西 [root@slave2 ~]# jps 9647 Jps 解决方案 # 进入/tmp查看 [root@slave1 dfs]# cd /tmp [root@slave1 tmp]# ll 总用量 48 drwxr-xr-x. 2
报错1 hive&gt; show databases; OK Failed with exception java.io.IOException:java.lang.RuntimeException: Error in configuring object Time taken: 0.474 se
报错1 [root@localhost ~]# vim -bash: vim: 未找到命令 安装vim yum -y install vim* # 查看是否安装成功 [root@hadoop01 hadoop]# rpm -qa |grep vim vim-X11-7.4.629-8.el7_9.x
修改hadoop配置 vi /usr/local/software/hadoop-2.9.2/etc/hadoop/yarn-site.xml # 添加如下 &lt;configuration&gt; &lt;property&gt; &lt;name&gt;yarn.nodemanager.res