RxJava:并行处理并合并结果(不弄乱顺序)

我对使用反应流非常陌生,遇到了以下问题,这使我很难解决。目的是从MongoDB数据库中获取许多文档。对于每个文档,请从db中获取元数据,并从db中获取文件(示例代码中尚未提供)。然后,我们需要将所有数据上传到s3(将所有三个项目组合在一起)。但是,我坚持合并不同的发布者而不会弄乱元素的顺序。

Publisher<Document> p = versionCollection.find();
ConnectableFlowable<Document> version = Flowable.fromPublisher(p).publish();

Observable<GridFSFile> gridFS = version
        .map(extractID())
        .flatMap(loadGridFSFile()).toObservable();

Observable c = version.toObservable()
        .zipWith(gridFS,(Document v,GridFSFile f) -> {

            // if I check here if both messages belong together,the order sometimes is messed up  
            return v;
        });
version.connect();

因此,基本上,我试图将事件发布到两个不同的路径,一个路径从GridFS获取元数据,然后尝试再次合并这两个路径(以便可以将初始文档与元数据一起访问)。但是,我注意到有时事件会以不同的顺序压缩(可能是因为对db的查询有时会花费不同的时间)。

每个事件的执行路径应如下

         v
         |
  /      |      \
v    query db   query db
  \      |      /
   upload aggregate
   of all 3 elements

从本质上讲,问题是,使用我的方法,最终得到的结果是来自先前查询的另一个元素v的结果。以后,我可能需要确保执行路径在一个输入的所有3个路径之间同步发生元素一次,但我不知道如何。

编辑

我终于找到了一种似乎可以满足需要的方法。但是,以并行方式处理并确保它们保持同步似乎很复杂

Publisher<Document> p = versionCollection.find();

Observable<Document> version = Observable.fromPublisher(p);
version.flatMap(v -> {

    ConnectableObservable<Document> connectableObservable = Observable.just(v).replay();

    Observable o = connectableObservable
        .map(extractAudioID())
        .flatMap(loadGridFSFile(audioBucket));

    Observable o3 = connectableObservable.zipWith(o,(Document a,GridFSFile f) -> {
            // now everything seems to stay in order here
            // and we can combine both results
    });
    o3.subscribe();
    o.subscribe();

    Disposable a = connectableObservable.connect();
    return connectableObservable;
},1).blockingSubscribe();


static Function<ObjectId,ObservableSource<GridFSFile>> loadGridFSFile(GridFSBucket audioBucket) {
    return id -> Observable.fromPublisher(audioBucket.find(new Document("_id",id)).first());
}
gzdvd 回答:RxJava:并行处理并合并结果(不弄乱顺序)

一些似乎可以解决问题的东西:

现在这段代码看起来更加合理:

ConnectableFlowable<Document> version = Flowable.fromPublisher(p).replay();

Flowable<GridFSFile> file = version
        .map(extractID())
        .concatMap(loadGridFSFile(audioBucket));

Flowable<GridFSDownloadStream> data = version
        .map(extractID())
        .map(loadGridFSData(audioBucket));

Flowable c = Flowable.zip(version,file,data,(v,f,d) -> {
    // so far everything seems to stay in order
    return v;
});

version.connect();

c.subscribe();
本文链接:https://www.f2er.com/3152271.html

大家都在问