android – 链接Retrofit服务w/RxJava支持

前端之家收集整理的这篇文章主要介绍了android – 链接Retrofit服务w/RxJava支持前端之家小编觉得挺不错的,现在分享给大家,也给大家做个参考。
我使用retrofit的Rx Java支持链接observable时遇到了麻烦.我可能误解了如何使用它,否则它可能是改造中的一个错误.希望有人在这里可以帮助我了解正在发生的事情.编辑:我正在使用MockRestAdapter进行这些响应 – 这可能是相关的,因为我看到RxSupport实现略有不同.

这是一个虚假的银行应用程序.它正在尝试进行转移,并且在转移完成后,它应该执行帐户请求以更新帐户值.这基本上只是我尝试使用flatMap的借口.遗憾的是,以下代码无效,订阅者无法收到通知

案例1:链接两个改造后的可观测量

转移服务(注意:返回改造后产生的观察结果):

  1. @FormUrlEncoded @POST("/user/transactions/")
  2. public Observable<TransferResponse> transfer(@Field("session_id") String sessionId,@Field("from_account_number") String fromAccountNumber,@Field("to_account_number") String toAccountNumber,@Field("amount") String amount);

帐户服务(注意:返回改进生成的observable):

  1. @FormUrlEncoded @POST("/user/accounts")
  2. public Observable<List<Account>> getAccounts(@Field("session_id") String sessionId);

将两个改造后的观测链连在一起:

  1. transfeRSService.transfer(session.getSessionId(),fromAccountNumber,toAccountNumber,amount)
  2. .flatMap(new Func1<TransferResponse,Observable<? extends List<Account>>>() {
  3. @Override public Observable<? extends List<Account>> call(TransferResponse transferResponse) {
  4. return accountsService.getAccounts(session.getSessionId());
  5. }
  6. })
  7. .subscribeOn(Schedulers.io())
  8. .observeOn(AndroidSchedulers.mainThread());

案例2:创建我自己的可观察和链接与改造生产的

如果我忽略了“平面映射”调用中Retrofit内置的Rx支持,它可以完美运行!所有订阅者都会收到通见下文:

新帐户服务(注意:不会产生可观察的):

  1. @FormUrlEncoded @POST("/user/accounts")
  2. public List<Account> getAccountsBlocking(@Field("session_id") String sessionId);

创建我自己的observable并自己发出项目:

  1. transfeRSService.transfer(session.getSessionId(),Observable<? extends List<Account>>>() {
  2. @Override public Observable<? extends List<Account>> call(TransferResponse transferResponse) {
  3. return Observable.create(new Observable.OnSubscribe<List<Account>>() {
  4. @Override public void call(Subscriber<? super List<Account>> subscriber) {
  5. List<Account> accounts = accountsService.getAccountsBlocking(session.getSessionId());
  6. subscriber.onNext(accounts);
  7. subscriber.onCompleted();
  8. }
  9. });
  10. }
  11. })
  12. .subscribeOn(Schedulers.io())
  13. .observeOn(AndroidSchedulers.mainThread());

任何帮助将不胜感激!

解决方法

答案是肯定的,你应该能够从Retrofit链接observables. MockRestAdapter $MockRxSupport:createMockObservable私有类中似乎存在一个错误.关于将订阅订阅到可观察者的调度方式似乎是错误的.在HttpExecutor线程本身启动之后,订阅了observable.我相信来自您的Schedulers.io()线程的原始流程已经完成并取消订阅,之后可以订阅mockHandler.invokeSync返回的Observable.如果你看一下retrofit-mock模块中的代码,希望这种解释有一些意义.

作为使用retrofit-mock时当前代码解决方法,您只能将内部默认Executor替换为您自己的ImmediateExecutor实现.这至少在测试模拟时有一个由Scheduler.io提供的单个线程流.

  1. // ImmediateExecutor.java
  2. public class ImmediateExecutor implements Executor {
  3. @Override
  4. public void execute(Runnable command) {
  5. command.run();
  6. }
  7. }
  8.  
  9. // Create your RestAdapter with your ImmdiateExecutor
  10. RestAdapter adapter = new RestAdapter.Builder()
  11. .setEndpoint(endpoint)
  12. .setExecutors(new ImmediateExecutor(),null)
  13. .build();

至于在源头修复问题,您还可以在项目中包含retrofit-mock项目作为源代码,并使用下面的代码修改MockRestAdapter $MockRxSupport:createMockObservable方法.我已经测试了你的用例,它确实解决了这个问题.

— MockRestAdapter.java $MockRxSupport —-

  1. Observable createMockObservable(final MockHandler mockHandler,final RestMethodInfo methodInfo,final RequestInterceptor interceptor,final Object[] args) {
  2. return Observable.create(new Observable.OnSubscribe<Object>() {
  3. @Override public void call(final Subscriber<? super Object> subscriber) {
  4. try {
  5. if (subscriber.isUnsubscribed()) return;
  6. Observable observable =
  7. (Observable) mockHandler.invokeSync(methodInfo,interceptor,args);
  8.  
  9. observable.subscribeOn(Schedulers.from(httpExecutor));
  10.  
  11. //noinspection unchecked
  12. observable.subscribe(subscriber);
  13.  
  14. } catch (RetrofitError e) {
  15. subscriber.onError(errorHandler.handleError(e));
  16. } catch (Throwable e) {
  17. subscriber.onError(e);
  18. }
  19. }
  20. });
  21. }

在Retrofit项目here中创建了一个问题,我们将看看他们是否接受了它.

猜你在找的Android相关文章