微信公众号搜"智元新知"关注
微信扫一扫可直接关注哦!

当 RxJS observable 完成时,我如何执行异步代码?

如何解决当 RxJS observable 完成时,我如何执行异步代码?

我想在 observable 完成时执行代码。在我的代码中,我执行这个:

  compact(): Observable<FileManifest> {
    return this.loadindex().pipe(
      mergeMap((index) => index.walk()),map((entry) => entry.manifest),notUndefined(),writeallMessages(this.newPath,ProtoFileManifest),finalize(async () => {
        await Promise.all([
          promises.rm(this.journalPath,{ force: true }),promises.rm(this.manifestPath,]);
        await promises.rename(this.newPath,this.manifestPath);
      }),);
  }

问题是finalize方法是为同步代码做的。当我像上面那样执行异步代码时,代码将独立于订阅执行。

我希望在处理可观察对象的资源时执行此操作,但我希望在订阅时始终收到该事件。

如何在 finalize 方法中放置异步代码

谢谢 乌尔里希

解决方法

一种方法是创建三个 observables 而不是试图全部完成 在一。每个将在您想要的顺序异步链中组成一个链接 制作。

为了使基于 Promise 的 observable 中的副作用变得懒惰,我们使用 defer。 请注意,延迟回调的返回值可以是可观察的,也可以是 “ObservableInput”,这就是 RxJS 调用它知道如何转换的值 变成可观察的。这个值可以是(除其他外)一个承诺。

({
  compact(): Observable<FileManifest> {
    const writeToTempManifest$ = this.loadIndex().pipe(
      mergeMap((index) => index.walk()),map((entry) => entry.manifest),notUndefined(),writeAllMessages(this.newPath,ProtoFileManifest)
    );

    const removeOldManifest$ = defer(() =>
      Promise.all([
        promises.rm(this.journalPath,{ force: true }),promises.rm(this.manifestPath,])
    );

    const renameNewManifest$ = defer(() =>
      promises.rename(this.newPath,this.manifestPath)
    );

    return from([
      writeToTempManifest$,removeOldManifest$,renameNewManifest$,]).pipe(concatAll());
  },});

请注意,这些 observable 中的每一个都可能会发出一些信息(尽管我不熟悉 API)。第一个发出 writeAllMessages 运算符所做的任何事情,而第二个和第三个发出各自承诺的解析值。在第二个的情况下,这是一个来自 Promise.all 的双元素数组。

如果你想抑制一个 observable 发出的值,同时在它完成之前仍然保持打开状态,你可以创建一个操作符来做到这一点:

const silence = pipe(concatMapTo(EMPTY));

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。