微信公众号搜"智元新知"关注
微信扫一扫可直接关注哦!

为什么 forkJoin 对一系列新的 Observable 没有完成?

如何解决为什么 forkJoin 对一系列新的 Observable 没有完成?

我正在尝试在一些异步任务上使用 forkjoin,但由于某种原因我没有在 forkjoin 上得到发射。

见stackblitz https://stackblitz.com/edit/angular-12-form-validation-yrstmk?devtoolsheight=33&file=src/app/app.component.ts

import { forkJoin,Observable,ReplaySubject } from 'rxjs';
import { switchMap } from 'rxjs/operators';

@Component({
  selector: 'my-app',templateUrl: './app.component.html',styleUrls: ['./app.component.css']
})
export class AppComponent implements OnInit {
  db: ReplaySubject<IDBDatabase> = new ReplaySubject(1);

  constructor() {}

  ngOnInit(): void {
    console.log('init!');

    setTimeout(() => {
      console.log('db init!');
      this.db.next(null);
    },2000);

    const test = [this.create(null),this.create(null),this.create(null)];
    forkJoin(test).subscribe(() => {
      console.log('forkjoined!');
    });
  }

  create(variable: any): Observable<any> {
    return this.db.pipe(
      switchMap(
        () =>
          new Observable<any>(observer => {
            setTimeout(() => {
              console.log('next!');
              observer.next(variable);
              observer.complete();
            },1000);
          })
      )
    );
  }
}

解决方法

答案是外部 observable 是一个不会完成的 private static AppiumDriver<MobileElement> driver; public AppiumDriver<MobileElement> getDriver() throws IOException { if (PLATFORM_NAME.equals("Android")) { driver = new AndroidDriver<MobileElement>(new URL("http://127.0.0.1:4723/wd/hub"),capabilities); } else if (PLATFORM_NAME.equals("iOS")) { driver = new IOSDriver<MobileElement>(new URL("http://127.0.0.1:4723/wd/hub"),capabilities); } return driver; }

我只是在我的管道中添加了 ReplaySubject

take(1)
,

我刚刚更新了你的 stackblitz。:https://stackblitz.com/edit/angular-12-form-validation-adupub?file=src/app/app.component.ts

你必须:

  • 添加一个变量以在订阅时访问它(此处)。
  • 在下一个返回一个值。

检查差异:
您的代码:

// line 30
forkJoin(test).subscribe(() => {
   console.log('forkjoined!');
});
//line 42
switchMap(
        () =>
          new Observable<any>(observer => {
            console.log('next!');
            observer.next(variable);
            observer.complete();
          })
      )

我的代码:

//line 30
forkJoin(test).subscribe((data) => {
   console.log('forkjoined!',data);
});
//Line 42
switchMap(
        () =>
          new Observable<any>(observer => {
            console.log('next!');
            observer.next("variable");
            observer.complete();
          })
      )

enter image description here

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。