将异步调用转换为RxJava,如何在处理完所有项目后手动完成操作? 您的emitter.onComplete()正好在该位置。

我有一个自定义类,用于将单个文件或多个文件上传到Firebase存储:

public class FirebaseUploader {
    private Context appContext;
    private FirebaseStorage storage;

    public FirebaseUploader(Context appContext) {
        this.appContext = appContext;
        this.storage = FirebaseStorage.getInstance();
    }

    // Uploading a single file
    public Observable<String> send(Uri file) {
        return toObservable(Observable.just(file));
    }

    // Uploading multiple files
    public Observable<String> send(List<Uri> files) {
        return toObservable(Observable.fromIterable(files));
    }

    private Observable<String> toObservable(Observable<Uri> observable) {
        return observable.flatMap(this::upload);
    }

    private Observable<String> upload(Uri file) {
        return Observable.create(new ObservableonSubscribe<String>() {
            @Override
            public void subscribe(ObservableEmitter<String> emitter) throws Exception {
                StorageReference branch = storage.getReference().child("images");
                StorageReference leaf = branch.child(generateUniqueId(file));
                UploadTask uploadTask = leaf.putFile(file);

                uploadTask.continueWithTask(new Continuation<UploadTask.Tasksnapshot,Task<Uri>>() {
                    @Override
                    public Task<Uri> then(@NonNull Task<UploadTask.Tasksnapshot> task) throws Exception {
                        if (!task.isSuccessful()) emitter.onError(task.getException());
                        return leaf.getDownloadUrl();
                    }
                }).addOnCompleteListener(new OnCompleteListener<Uri>() {
                    @Override
                    public void onComplete(@NonNull Task<Uri> task) {
                        if (task.isSuccessful()) {
                            Uri downloadUri = task.getResult();
                            String url = downloadUri.toString();
                            emitter.onNext(url);
                            //emitter.onComplete();
                        } else {
                            // Handle failures
                            // ...
                        }
                    }
                });
            }
        });
    }

    private String generateUniqueId(Uri file) {
        String fileType = appContext.getcontentResolver().getType(file);
        String extension = (fileType.equals("image/jpeg")) ? ".jpg" : ".png";
        String fileName = UUID.randomUUID().toString();
        return fileName + extension;
    }
}

实际上,我花了几天时间才提出这个解决方案,因为最初我不知道如何将Firebase异步调用转换为RxJava。然后我呆了好一阵子,想知道为什么可观察对象没有完成,直到我最终意识到我正在调用emitter.onNextemitter.onError,但是我的代码中没有emitter.onComplete

因此emitter.onCompleteOnCompleteListener内部。但是它已被注释掉-那不是应该打电话的地方。我一直在寻找的是一种仅在处理完所有文件之后在.onComplete()上调用emitter的方法。以我对RxJava的有限经验,我看到的唯一方法是首先对要上传的文件数量进行计数,然后在达到该数量时手动完成序列。有没有更好的方法?

wolflaw 回答:将异步调用转换为RxJava,如何在处理完所有项目后手动完成操作? 您的emitter.onComplete()正好在该位置。

您的emitter.onComplete()正好在该位置。

upload方法返回的Observable只负责一件事:上传Uri参数指定的单个文件。 UploadTask完成后便完成了该任务,因此您应该在此时致电onComplete

flatMap运算符负责将所有这些结果收集到一个Observable中。上传完所有文件后,它将向下游发出onComplete,这正是您想要的。

本文链接:https://www.f2er.com/3063747.html

大家都在问