我可能丢失了一些内容(添加了关于我的Publisher
接口版本的注释)。
但是..这就是我将在概念上做这样的事情的方式。
我将简化界面来处理Integers
:
// remote resource
interface Resource {
ConnectableObservable<Integer> fetch();
}
// cache
interface Cache {
Completable save(Integer data);
}
// client response connection
interface Connection {
Completable send(Integer data);
}
我将使用Observable::publish
创建一个ConnectableObservable
并建立两个订阅:
@Test
public void testProxy()
{
// Override schedulers:
TestScheduler s = new TestScheduler();
RxJavaPlugins.setIoSchedulerHandler(
scheduler -> s );
RxJavaPlugins.setComputationSchedulerHandler(
scheduler -> s );
// Mock interfaces:
Resource resource = () -> Observable.range( 1,100 )
.publish();
Cache cache = data -> Completable.fromObservable( Observable.just( data )
.delay( 100,TimeUnit.MILLISECONDS )
.doOnNext( __ -> System.out.println( String.format( "Caching %d",data ))));
Connection connection = data -> Completable.fromObservable( Observable.just( data )
.delay( 500,TimeUnit.MILLISECONDS )
.doOnNext( __ -> System.out.println( String.format( "Sending %d",data ))));
// Subscribe to resource:
ConnectableObservable<Integer> observable = resource.fetch();
observable
.observeOn( Schedulers.io() )
.concatMapCompletable( data -> connection.send( data ))
.subscribe();
observable
.observeOn( Schedulers.computation() )
.concatMapCompletable( data -> cache.save( data ))
.subscribe();
observable.connect();
// Simulate passage of time:
s.advanceTimeBy( 10,TimeUnit.SECONDS );
}
输出:
Caching 1
Caching 2
Caching 3
Caching 4
Sending 1
Caching 5
Caching 6
Caching 7
Caching 8
Caching 9
Sending 2
Caching 10
. . .
更新
根据您的评论,听起来对您而言,尊重背压很重要。
假设您在某个地方可以承受背压的Publisher
,可以将其转换为Flowable
,如下所示:
Flowable<T> flowable = Flowable.fromPublisher( publisher );
拥有Flowable
后,您可以允许多个订阅者,而不必担心每个订阅者都必须向Publisher
请求值(或其中一个订阅者在建立订阅时会丢失任何事件)。为此,您可以调用flowable.publish()
创建一个ConnectableFlowable
。
ConnectableFlowable<T> flowable = Flowable.fromPublisher( publisher ).publish();
out.send(flowable); // calls flowable.subscribe()
cache.save(flowable); // calls flowable.subscribe()
flowable.connect(); // begins emitting values
本文链接:https://www.f2er.com/2123737.html