RxJs-结合两个可观察对象,结果选择器功能,zip

我正在处理一些旧代码,但设备状态更新错误有一个问题。问题是,旧的旧版状态更新是使用长轮询完成的,并且BE上存在并发问题,该问题不会得到解决(不推荐使用BE)。有时它忘记更新状态。消息到达,但不正确。

我可以从新的BE中检索请求它的正确状态(该应用程序现在是令人讨厌的混合动力),但我无法正确执行。我的第一个想法是使用whitLatestFrom,但由于明显的原因而无法使用。

legacyService.getState<LegacyState>()
  .pipe(
    subscribeon(queue),withLatestFrom(this.realService.getRealStatus()),// request to the new BE
    map(([legacy,real]) => {
      // This if statement is a must. getRealStatus() returns only partial state,not the whole DeviceState
      // If there is a difference in the state from two sources,I create a new object with correct state
      if (legacy.state === DeviceState.INITIALIZED && !real.stable) {
        return new LegacyState(
          legacy.instrument,legacy.db,DeviceState.UNSTABLE,);
      }
      return event; // states are same,returning state from legacy source
    })
  )
  .subscribe(event => {
    this._deviceStateUpdated$.next(event);
  });

可以在应用重新启动/重新加载时使用,但是以后real状态不会更新,因为不会进行任何新调用,它只会返回以前的值。 combineLatest也会发生同样的情况。第一个(来自轮询)已更新,第二个仅返回先前的值。

问题是:我如何以这样的方式组合两个可观察对象,即在更新第一个可观察对象时,我也强制更新第二个可观察对象的新值?当然,我能够处理它们两者,因为第二个可观察对象仅返回部分状态。我尝试了多张地图(swtichMap,concatMap,...),但没有成功。

iCMS 回答:RxJs-结合两个可观察对象,结果选择器功能,zip

我所说的投影函数更准确地称为结果选择器函数。请参见this page上的示例3。我将使用的基本结构是这样:

import { interval } from 'rxjs';
import { switchMap } from 'rxjs/operators';

//whatever your long polling interval is
const polling = interval(10000);

//then use that to collect your states 
const collectState = polling
    //pipe the interval event
   .pipe(
      //first you need the real state so you switchMap into that
      //I'm assuming this is Rx can treat as an observable,like a promise
      switchMap(this.realService.getRealStatus()),//then you have your real status and you need to use a result selector function
      switchMap(() => 
         legacyService.getState(),//at this point in the result selector function you have access to both states
         (real,legacy) => {
             //so you can apply your logic
             if (legacy.state === DeviceState.INITIALIZED && !real.stable) {
                 return new LegacyState(
                    legacy.instrument,legacy.db,DeviceState.UNSTABLE
                );
             } else { return legacy }
      })
    );
  

我意识到我无助地改变了真实/旧有顺序,但是您明白了要点。

执行此操作的另一种方法是创建一个可观察到的间隔和一个可压缩的可观察到的间隔,该间隔仅在真实状态和旧状态都已发出时才发出

import { zip,interval } from 'rxjs';
import { switchMap,map } from 'rxjs/operators';

const polling = interval(10000);
const states = zip(
    legacyService.getState(),this.realService.getRealStatus()
);
const collectState = polling
    .pipe(
        switchMap(states),map(statesArray => {
           //do the work here
        })
    );

);

希望这里有帮助

,

使用@Tom的方法作为灵感,我得以解决它。尽管我直接将switchMap用管道传输到旧版服务上,而不是用轮询间隔来传输。

legacyService.getState<LegacyState>()
  .pipe(
    subscribeOn(queue),switchMap(() =>
      this.realService.getRealStatus(),// request to the new BE
      (legacy,real) => {
        if (legacy.state === DeviceState.INITIALIZED && !real.stable) {
          return new LegacyState(
            legacy.instrument,DeviceState.UNSTABLE,);
        }
        return event; // states are same,returning state from legacy source
      }
    )
  .subscribe(event => {
    this._deviceStateUpdated$.next(event);
  });
本文链接:https://www.f2er.com/2132075.html

大家都在问