手动取消订阅
Consumer类型
Observable创建返回Disposable取消
- public class SecondActivity extends AppCompatActivity {
- private static final String TAG = "SecondActivity";
- private Disposable disposable;
- @Override
- protected void onCreate(Bundle savedInstanceState) {
- super.onCreate(savedInstanceState);
- setContentView(R.layout.activity_second);
- disposable = Observable.create(new ObservableOnSubscribe<String>() {
- @Override
- public void subscribe(ObservableEmitter<String> emitter) throws Exception {
- try {
- Thread.sleep(5000);
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- }
- }).subscribeOn(Schedulers.io())
- .observeOn(AndroidSchedulers.mainThread())
- .subscribe(new Consumer<String>() {
- @Override
- public void accept(String s) throws Exception {
- Log.d(TAG,"accept: "+s);
- }
- });
- }
- @Override
- protected void onDestroy() {
- super.onDestroy();
- Log.d(TAG,"onDestroy: ");
- //取消订阅
- if(disposable != null && !disposable.isDisposed()){
- disposable.dispose();
- Log.d(TAG,"onDestroy: dispose");
- }
- }
- }
普通类型Observer
在Observer中获取Disposable然后取消
- public class ThirdActivity extends AppCompatActivity {
- private static final String TAG = "ThirdActivity";
- Disposable disposable;
- @Override
- protected void onCreate(Bundle savedInstanceState) {
- super.onCreate(savedInstanceState);
- setContentView(R.layout.activity_third);
- Observable.create(new ObservableOnSubscribe<String>() {
- @Override
- public void subscribe(ObservableEmitter<String> emitter) throws Exception {
- try {
- Thread.sleep(5000);
- emitter.onNext("testInfo");
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- }
- }).subscribeOn(Schedulers.io())
- .observeOn(AndroidSchedulers.mainThread())
- .subscribe(new Observer<String>() {
- @Override
- public void onSubscribe(Disposable d) {
- disposable = d;
- }
- @Override
- public void onNext(String s) {
- Log.d(TAG,"onNext: "+s);
- }
- @Override
- public void onError(Throwable e) {
- Log.d(TAG,"onError: ");
- }
- @Override
- public void onComplete() {
- Log.d(TAG,"onComplete: ");
- }
- });
- }
- @Override
- protected void onDestroy() {
- super.onDestroy();
- Log.d(TAG,"onDestroy: ");
- //然后在需要取消订阅的地方调用即可
- if (disposable != null && !disposable.isDisposed()) {
- Log.d(TAG,"dispose: ");
- disposable.dispose();
- }
- }
- }
DisposableObserver类型
利用DisposableObserver和SubscribeWith直接返回Disposable,然后取消
- public class FourthActivity extends AppCompatActivity {
- private static final String TAG = "FourthActivity";
- private DisposableObserver<String> observer;
- @Override
- protected void onCreate(Bundle savedInstanceState) {
- super.onCreate(savedInstanceState);
- setContentView(R.layout.activity_fourth);
- observer = Observable.create(new ObservableOnSubscribe<String>() {
- @Override
- public void subscribe(ObservableEmitter<String> emitter) throws Exception {
- try {
- Thread.sleep(5000);
- emitter.onNext("testInfo");
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- }
- }).subscribeOn(Schedulers.io())
- .observeOn(AndroidSchedulers.mainThread())
- .subscribeWith(new DisposableObserver<String>() {
- @Override
- public void onNext(String o) {
- Log.d(TAG,"onNext: "+o);
- }
- @Override
- public void onError(Throwable e) {
- Log.d(TAG,"onError: ");
- }
- @Override
- public void onComplete() {
- Log.d(TAG,"onComplete: ");
- }
- });
- }
- @Override
- protected void onDestroy() {
- super.onDestroy();
- if (observer != null && !observer.isDisposed()) {
- Log.d(TAG,"dispose: ");
- observer.dispose();
- }
- }
- }
取消多个Observer
把多个Observer添加CompositeDisposable,一次取消
- public class ComDisposableActivity extends AppCompatActivity {
- private Disposable disposable1;
- private Disposable disposable2;
- private static final String TAG = "ComDisposableActivity";
- @Override
- protected void onCreate(Bundle savedInstanceState) {
- super.onCreate(savedInstanceState);
- setContentView(R.layout.activity_com_disposable);
- Observable.create(new ObservableOnSubscribe<String>() {
- @Override
- public void subscribe(ObservableEmitter<String> emitter) throws Exception {
- try {
- Thread.sleep(5000);
- emitter.onNext("testInfo");
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- }
- }).subscribeOn(Schedulers.io())
- .observeOn(AndroidSchedulers.mainThread())
- .doOnDispose(new Action() {
- @Override
- public void run() throws Exception {
- Log.d(TAG,"run: Unsubscribing subscription from onCreate()");
- }
- })
- .subscribe(new Observer<String>() {
- @Override
- public void onSubscribe(Disposable d) {
- disposable1 = d;
- }
- @Override
- public void onNext(String s) {
- Log.d(TAG,"onComplete: ");
- }
- });
- Observable.create(new ObservableOnSubscribe<String>() {
- @Override
- public void subscribe(ObservableEmitter<String> emitter) throws Exception {
- try {
- Thread.sleep(5000);
- emitter.onNext("testInfo");
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- }
- }).subscribeOn(Schedulers.io())
- .observeOn(AndroidSchedulers.mainThread())
- .subscribe(new Observer<String>() {
- @Override
- public void onSubscribe(Disposable d) {
- disposable2 = d;
- }
- @Override
- public void onNext(String s) {
- Log.d(TAG,"onComplete: ");
- }
- });
- }
- @Override
- protected void onDestroy() {
- super.onDestroy();
- CompositeDisposable compositeDisposable = new CompositeDisposable();
- //批量添加
- compositeDisposable.add(disposable1);
- compositeDisposable.add(disposable2);
- //最后一次性全部取消订阅
- compositeDisposable.dispose();
- }
- }
RxLifecyle取消
OnDestory取消
- Observable.interval(1,TimeUnit.SECONDS)
- .doOnDispose(new Action() {
- @Override
- public void run() throws Exception {
- Log.d(TAG,"Unsubscribing bindToLifecycle from onDestroy()");
- }
- })
- .compose(this.<Long>bindToLifecycle())
- .subscribe(new Consumer<Long>() {
- @Override
- public void accept(Long num) throws Exception {
- Log.d(TAG,"accept: " + num);
- }
- });
指定生命周期取消
- Observable.interval(1,"Unsubscribing UbindUntilEvent from onPause()");
- }
- }).compose(this.<Long>bindUntilEvent(ActivityEvent.PAUSE))
- .subscribe(new Consumer<Long>() {
- @Override
- public void accept(Long aLong) throws Exception {
- Log.d(TAG,"bindUntilEvent accept: " + aLong);
- }
- });