RxJava取消订阅的各种方式的实现

前端之家收集整理的这篇文章主要介绍了RxJava取消订阅的各种方式的实现前端之家小编觉得挺不错的,现在分享给大家,也给大家做个参考。

手动取消订阅

Consumer类型

Observable创建返回Disposable取消

  1. public class SecondActivity extends AppCompatActivity {
  2.  
  3. private static final String TAG = "SecondActivity";
  4. private Disposable disposable;
  5.  
  6. @Override
  7. protected void onCreate(Bundle savedInstanceState) {
  8. super.onCreate(savedInstanceState);
  9. setContentView(R.layout.activity_second);
  10. disposable = Observable.create(new ObservableOnSubscribe<String>() {
  11. @Override
  12. public void subscribe(ObservableEmitter<String> emitter) throws Exception {
  13. try {
  14. Thread.sleep(5000);
  15. } catch (InterruptedException e) {
  16. e.printStackTrace();
  17. }
  18. }
  19. }).subscribeOn(Schedulers.io())
  20. .observeOn(AndroidSchedulers.mainThread())
  21. .subscribe(new Consumer<String>() {
  22. @Override
  23. public void accept(String s) throws Exception {
  24. Log.d(TAG,"accept: "+s);
  25. }
  26. });
  27. }
  28.  
  29. @Override
  30. protected void onDestroy() {
  31. super.onDestroy();
  32. Log.d(TAG,"onDestroy: ");
  33. //取消订阅
  34. if(disposable != null && !disposable.isDisposed()){
  35. disposable.dispose();
  36. Log.d(TAG,"onDestroy: dispose");
  37. }
  38. }
  39. }

普通类型Observer

在Observer中获取Disposable然后取消

  1. public class ThirdActivity extends AppCompatActivity {
  2. private static final String TAG = "ThirdActivity";
  3. Disposable disposable;
  4.  
  5. @Override
  6. protected void onCreate(Bundle savedInstanceState) {
  7. super.onCreate(savedInstanceState);
  8. setContentView(R.layout.activity_third);
  9. Observable.create(new ObservableOnSubscribe<String>() {
  10. @Override
  11. public void subscribe(ObservableEmitter<String> emitter) throws Exception {
  12. try {
  13. Thread.sleep(5000);
  14. emitter.onNext("testInfo");
  15. } catch (InterruptedException e) {
  16. e.printStackTrace();
  17. }
  18. }
  19. }).subscribeOn(Schedulers.io())
  20. .observeOn(AndroidSchedulers.mainThread())
  21. .subscribe(new Observer<String>() {
  22. @Override
  23. public void onSubscribe(Disposable d) {
  24. disposable = d;
  25. }
  26.  
  27. @Override
  28. public void onNext(String s) {
  29. Log.d(TAG,"onNext: "+s);
  30. }
  31.  
  32. @Override
  33. public void onError(Throwable e) {
  34. Log.d(TAG,"onError: ");
  35. }
  36.  
  37. @Override
  38. public void onComplete() {
  39. Log.d(TAG,"onComplete: ");
  40. }
  41. });
  42. }
  43.  
  44. @Override
  45. protected void onDestroy() {
  46. super.onDestroy();
  47. Log.d(TAG,"onDestroy: ");
  48. //然后在需要取消订阅的地方调用即可
  49. if (disposable != null && !disposable.isDisposed()) {
  50. Log.d(TAG,"dispose: ");
  51. disposable.dispose();
  52. }
  53. }
  54. }

DisposableObserver类型

利用DisposableObserver和SubscribeWith直接返回Disposable,然后取消

  1. public class FourthActivity extends AppCompatActivity {
  2. private static final String TAG = "FourthActivity";
  3. private DisposableObserver<String> observer;
  4.  
  5. @Override
  6. protected void onCreate(Bundle savedInstanceState) {
  7. super.onCreate(savedInstanceState);
  8. setContentView(R.layout.activity_fourth);
  9. observer = Observable.create(new ObservableOnSubscribe<String>() {
  10. @Override
  11. public void subscribe(ObservableEmitter<String> emitter) throws Exception {
  12. try {
  13. Thread.sleep(5000);
  14. emitter.onNext("testInfo");
  15. } catch (InterruptedException e) {
  16. e.printStackTrace();
  17. }
  18. }
  19. }).subscribeOn(Schedulers.io())
  20. .observeOn(AndroidSchedulers.mainThread())
  21. .subscribeWith(new DisposableObserver<String>() {
  22. @Override
  23. public void onNext(String o) {
  24. Log.d(TAG,"onNext: "+o);
  25. }
  26.  
  27. @Override
  28. public void onError(Throwable e) {
  29. Log.d(TAG,"onError: ");
  30. }
  31.  
  32. @Override
  33. public void onComplete() {
  34. Log.d(TAG,"onComplete: ");
  35. }
  36. });
  37. }
  38.  
  39. @Override
  40. protected void onDestroy() {
  41. super.onDestroy();
  42. if (observer != null && !observer.isDisposed()) {
  43. Log.d(TAG,"dispose: ");
  44. observer.dispose();
  45. }
  46. }
  47. }

取消多个Observer

把多个Observer添加CompositeDisposable,一次取消

  1. public class ComDisposableActivity extends AppCompatActivity {
  2.  
  3. private Disposable disposable1;
  4. private Disposable disposable2;
  5. private static final String TAG = "ComDisposableActivity";
  6. @Override
  7. protected void onCreate(Bundle savedInstanceState) {
  8. super.onCreate(savedInstanceState);
  9. setContentView(R.layout.activity_com_disposable);
  10. Observable.create(new ObservableOnSubscribe<String>() {
  11. @Override
  12. public void subscribe(ObservableEmitter<String> emitter) throws Exception {
  13. try {
  14. Thread.sleep(5000);
  15. emitter.onNext("testInfo");
  16. } catch (InterruptedException e) {
  17. e.printStackTrace();
  18. }
  19. }
  20. }).subscribeOn(Schedulers.io())
  21. .observeOn(AndroidSchedulers.mainThread())
  22. .doOnDispose(new Action() {
  23. @Override
  24. public void run() throws Exception {
  25. Log.d(TAG,"run: Unsubscribing subscription from onCreate()");
  26. }
  27. })
  28. .subscribe(new Observer<String>() {
  29. @Override
  30. public void onSubscribe(Disposable d) {
  31. disposable1 = d;
  32. }
  33.  
  34. @Override
  35. public void onNext(String s) {
  36. Log.d(TAG,"onComplete: ");
  37. }
  38. });
  39. Observable.create(new ObservableOnSubscribe<String>() {
  40. @Override
  41. public void subscribe(ObservableEmitter<String> emitter) throws Exception {
  42. try {
  43. Thread.sleep(5000);
  44. emitter.onNext("testInfo");
  45. } catch (InterruptedException e) {
  46. e.printStackTrace();
  47. }
  48. }
  49. }).subscribeOn(Schedulers.io())
  50. .observeOn(AndroidSchedulers.mainThread())
  51. .subscribe(new Observer<String>() {
  52. @Override
  53. public void onSubscribe(Disposable d) {
  54. disposable2 = d;
  55. }
  56.  
  57. @Override
  58. public void onNext(String s) {
  59. Log.d(TAG,"onComplete: ");
  60. }
  61. });
  62. }
  63.  
  64. @Override
  65. protected void onDestroy() {
  66. super.onDestroy();
  67. CompositeDisposable compositeDisposable = new CompositeDisposable();
  68. //批量添加
  69. compositeDisposable.add(disposable1);
  70. compositeDisposable.add(disposable2);
  71. //最后一次性全部取消订阅
  72. compositeDisposable.dispose();
  73. }
  74. }

RxLifecyle取消

OnDestory取消

  1. Observable.interval(1,TimeUnit.SECONDS)
  2. .doOnDispose(new Action() {
  3. @Override
  4. public void run() throws Exception {
  5. Log.d(TAG,"Unsubscribing bindToLifecycle from onDestroy()");
  6. }
  7. })
  8. .compose(this.<Long>bindToLifecycle())
  9. .subscribe(new Consumer<Long>() {
  10. @Override
  11. public void accept(Long num) throws Exception {
  12. Log.d(TAG,"accept: " + num);
  13. }
  14. });

指定生命周期取消

  1. Observable.interval(1,"Unsubscribing UbindUntilEvent from onPause()");
  2. }
  3. }).compose(this.<Long>bindUntilEvent(ActivityEvent.PAUSE))
  4. .subscribe(new Consumer<Long>() {
  5. @Override
  6. public void accept(Long aLong) throws Exception {
  7. Log.d(TAG,"bindUntilEvent accept: " + aLong);
  8. }
  9. });

以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持我们。

猜你在找的Android相关文章