RxJava一位发布者的多个使用者

我正在编写某种带有缓存的中间件HTTP代理。工作流程是:

  1. 客户请求此代理获取资源
  2. 如果缓存中存在资源,则代理将其返回
  3. 如果未找到资源,则代理将获取远程资源并返回给用户。代理在加载数据时将此资源保存到缓存中。

我的接口具有Publisher<ByteBuffer>的远程资源流,接受Publisher<ByteBuffer>进行保存的缓存以及接受Publisher<ByteBuffer>作为响应的客户端连接:

// remote resource
interface Resource {
  Publisher<ByteBuffer> fetch();
}

// cache
interface Cache {
  Completable save(Publisher<ByteBuffer> data);
}

// clien response connection
interface Connection {
  Completable send(Publisher<ByteBuffer> data);
}

我的问题是,在将响应发送到客户端时,我需要延迟保存此字节缓冲区流以进行缓存,因此,客户端应负责以从远程请求ByteByffer块资源,不缓存

我尝试使用Publisher::cache方法,但这对我来说不是一个好选择,因为它将所有接收到的数据保留在内存中,这是不可接受的,因为缓存的数据可能只有几GB的大小。

作为一种解决方法,我创建了Subject,并填充了从Resource收到的下一个项目:

private final Cache cache;
private final Connection out;

Completable proxy(Resource res) {
  Subject<ByteBuffer> mirror = PublishSUbject.create();
  return Completable.mergeArray(
    out.send(res.fetch().doOnNext(mirror::onNext),cache.save(mirror.toFlowable(BackpressureStrategy.BUFFER))
  );
}

是否可以在不将项目缓存在内存中的情况下重用相同的Publisher,并且只有一个订阅者负责向发布者请求项目?

iCMS 回答:RxJava一位发布者的多个使用者

我可能丢失了一些内容(添加了关于我的Publisher接口版本的注释)。

但是..这就是我将在概念上做这样的事情的方式。

我将简化界面来处理Integers

// remote resource
interface Resource {
  ConnectableObservable<Integer> fetch();
}

// cache
interface Cache {
  Completable save(Integer data);
}

// client response connection
interface Connection {
  Completable send(Integer data);
}

我将使用Observable::publish创建一个ConnectableObservable并建立两个订阅:

@Test
public void testProxy()
{
    // Override schedulers:
    TestScheduler s = new TestScheduler();
    
    RxJavaPlugins.setIoSchedulerHandler(
            scheduler -> s );
    RxJavaPlugins.setComputationSchedulerHandler(
            scheduler -> s );
    
    // Mock interfaces:
    Resource resource = () -> Observable.range( 1,100 )
            .publish();
    
    Cache cache = data -> Completable.fromObservable( Observable.just( data )
                .delay( 100,TimeUnit.MILLISECONDS )
                .doOnNext( __ -> System.out.println( String.format( "Caching %d",data ))));
    
    Connection connection = data -> Completable.fromObservable( Observable.just( data )
                .delay( 500,TimeUnit.MILLISECONDS )
                .doOnNext( __ -> System.out.println( String.format( "Sending %d",data ))));
    
    // Subscribe to resource:
    ConnectableObservable<Integer> observable = resource.fetch();
    
    observable
        .observeOn( Schedulers.io() )
        .concatMapCompletable( data -> connection.send( data ))
        .subscribe();
    
    observable
        .observeOn( Schedulers.computation() )
        .concatMapCompletable( data -> cache.save( data ))
        .subscribe();
    
    observable.connect();
    
    // Simulate passage of time:
    s.advanceTimeBy( 10,TimeUnit.SECONDS );
}

输出:

Caching 1
Caching 2
Caching 3
Caching 4
Sending 1
Caching 5
Caching 6
Caching 7
Caching 8
Caching 9
Sending 2
Caching 10
. . . 

更新

根据您的评论,听起来对您而言,尊重背压很重要。

假设您在某个地方可以承受背压的Publisher,可以将其转换为Flowable,如下所示:

Flowable<T> flowable = Flowable.fromPublisher( publisher );

拥有Flowable后,您可以允许多个订阅者,而不必担心每个订阅者都必须向Publisher请求值(或其中一个订阅者在建立订阅时会丢失任何事件)。为此,您可以调用flowable.publish()创建一个ConnectableFlowable

enter image description here

ConnectableFlowable<T> flowable = Flowable.fromPublisher( publisher ).publish();
out.send(flowable);   // calls flowable.subscribe()
cache.save(flowable); // calls flowable.subscribe()
flowable.connect();   // begins emitting values
本文链接:https://www.f2er.com/2123737.html

大家都在问