RXJS flatMap可重复观察

我正在尝试实现服务,无论应用程序是否与我的服务器建立连接,该服务都可观察到,因此当浏览器在线时,我们使用计时器ping服务器。这是代码:

@Html.DisplayFor(modelItem => item.StartDate.ToLongDateString())

public get $connected(): Observable<boolean> { return this.hasInternetConnection .asObservable() .pipe( distinctUntilChanged(),flatMap((connected: boolean) => { if (!connected) { return of(connected); } else { return timer(5000) .pipe( map(() => { var success = Math.random() > 0.5; console.log('PING: ' + success); return success; }) ); } }) ); } 只是绑定到窗口hasInternetConnectiononline事件的BehaviorSubject,计时器模拟对我的API服务器的ping。

问题是我的订阅offline仅从可观察到的计时器中捕获第一个值,然后不起作用。在$connected主题更改为hasInternetConnection并返回到false之后,我的订阅再次获得第一个值,然后什么都没有。这是我在控制台中看到的内容:

true

我该如何解决?谢谢!

zzzyq 回答:RXJS flatMap可重复观察

完整解决方案:

  private hasInternetConnection: BehaviorSubject<boolean> = new BehaviorSubject<boolean>(navigator.onLine);
  private connectedSubject: BehaviorSubject<boolean> = new BehaviorSubject<boolean>(true);
  private recheckConnectionSubject: Subject<void> = new Subject<void>();

  constructor(
    private readonly http: HttpClient,) {
    fromEvent(window,'online')
      .pipe(takeUntil(this.destroyed))
      .subscribe(() => {
        this.hasInternetConnection.next(true);
      });
    fromEvent(window,'offline')
      .pipe(takeUntil(this.destroyed))
      .subscribe(() => {
        this.hasInternetConnection.next(false);
      });
    merge(
      this.hasInternetConnection,this.recheckConnectionSubject,)
      .pipe(
        mapTo(this.hasInternetConnection.value),switchMap((connected: boolean) => {
          if (!connected) {
            return of(connected);
          } else {
            return timer(0,30000)
              .pipe(
                mergeMapTo(this.http.get(`${environment.apiRoot}/ping`,{ responseType: 'text' })
                               .pipe(
                                 map((res) => {
                                   return true;
                                 }),catchError(() => {
                                   return of(false);
                                 })
                               )
                ),);
          }
        })
      )
      .subscribe(this.connectedSubject);
  }

  public get $connected(): Observable<boolean> {
    return this.connectedSubject.asObservable()
               .pipe(
                 distinctUntilChanged(),);
  }

  public resetTimer(): void {
      this.recheckConnectionSubject.next();
  }
本文链接:https://www.f2er.com/3131881.html

大家都在问