等到每个外部线程上的RxJava有更多条件时-跨线程执行单个操作

我需要创建这个:

等到每个外部线程上的RxJava有更多条件时-跨线程执行单个操作

现在,当所有 signHash(..)在新线程上完成时,我有了一个用于创建新线程和在主线程上恢复简历的有效解决方案。 [请参阅文章末尾的工作代码]

您可以在图中看到,我必须创建的是一种在新的分离线程上管理 signHash(..)方法的一部分的“等待和恢复功能”的方法>现在可以通过睡眠进行模拟。

尤其是在必须出现在该位置的 String hash = doStuff(..)之后,我想填充一个像这样的bean:

import java.util.Hashtable;
import java.util.Map;

public class DocumentHashBucket {    
    private int numberNeededHashes;
    private boolean completedStoreHashes;
    private boolean completedStoreSignedHashes;
    private Map<String,byte[]> map = new Hashtable<>();
} 

为了存储由 String hash = doStuff(..)创建的散列,并且只有所有线程都完成了它们的 doStuff(..)到达 numberNeededHashes 我只能在单个线程上执行一次对互联网上签名为哈希的云服务的调用。当云上的签名哈希完成后,我可以更改带有签名哈希的地图,并允许恢复执行 doStuff2(signedHash);的 signHash(..),然后关闭方法/线程。

要求:必须在该位置调用 doStuff(..) doStuff2(signedHash)并管理单个外部云他们之间的通话。

问题:如何使用RxJava轻松实现?

非常感谢

我正在使用的工作代码:

package com.example.unit;

import io.reactivex.rxjava3.core.Observable;
import io.reactivex.rxjava3.functions.Consumer;
import io.reactivex.rxjava3.functions.Function;
import io.reactivex.rxjava3.schedulers.Schedulers;
import org.junit.jupiter.api.Test;

import java.util.*;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Callable;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.Executor;

import static org.junit.jupiter.api.Assertions.*;

public class UTestRxJava {

    @Test
    void uRxJavaSimple() {

        // Create an iterable observables
        List<Integer> calls = new LinkedList<>();
        calls.add(1);
        calls.add(2);
        calls.add(3);
        calls.add(4);

        final BlockingQueue<Runnable> tasks = new LinkedBlockingQueue<>();
        System.out.println("Starting parallel executions");

        // Create an iterable observables
        List<Observable<Integer>> observables = new LinkedList<>();
        for (final Integer i: calls) {
            System.out.println("Adding... "+i);
            observables.add(Observable.fromCallable(new Callable<Integer>() {
                @Override
                public Integer call() throws Exception {
                    return signHash(i);
                }
            }).subscribeon(Schedulers.newThread())); // subscribeon => specify the Scheduler on which an Observable will operate
        }

        final Map<String,String> mapResults = new HashMap<>();

        Observable.zip(observables,new Function<Object[],Object>() {
            @Override
            public Object apply(Object[] objects) throws Throwable { // Zip observables
                System.out.println("apply()");
                List<String> observables = new LinkedList<>();
                for (Object obj:objects) {
                    System.out.println("Applying... "+obj.toString());
                    observables.add(obj.toString());
                }
                return observables;
            }
        })
                .doOnNext(new Consumer<Object>() {
                    @Override
                    public void accept(Object results) throws Throwable {
                        System.out.println("Ending parallel executions");
                    }
                })
                .observeon(Schedulers.from(new Executor() {
                    @Override
                    public void execute(Runnable runnable) {
                        tasks.add(runnable);// Add a scheduler with executor from the current thread
                    }}))
                .subscribe(new Consumer<Object>() {
                    // The Subscribe operator is the glue that connects an observer to an Observable
                    @Override
                    public void accept(Object results) throws Throwable { // Subscribe to the result.
                        // Put your code that needs to "wait"
                        for (String x : (List<String>)results) {
                            System.out.println("Results: "+x);
                            mapResults.put(x,"OK");
                        }
                    }
                });

        System.out.println("[START] TAKE-RUN");
        try {
            tasks.take().run();
        } catch (InterruptedException e) {e.printStackTrace();fail("it's not possible that there is an exception");}
        System.out.println("[END] TAKE-RUN");

        assertTrue(mapResults.size()==4);
    }

    private Integer signHash(Integer number) {
        String hash = doStuff(..)
        Integer result = number * number;
        System.out.println("Pre log \t"+Thread.currentThread().getName()+"\t"+ result);
        try {
            // TODO chage it with List<Hash> signHashesOnInternet(List<Hash>) only on one of all threads
            Thread.sleep(number * 1000);
        } catch (Exception e) {e.printStackTrace();}
        System.out.println("Post log \t"+Thread.currentThread().getName()+"\t"+ result);
        doStuff2(signedHash);
        return result;
    }

}

输出为:

Starting parallel executions
Adding... 1
Adding... 2
Adding... 3
Adding... 4
[START] TAKE-RUN
Pre log     RxNewThreadScheduler-1  1
Pre log     RxNewThreadScheduler-2  4
Pre log     RxNewThreadScheduler-3  9
Pre log     RxNewThreadScheduler-4  16
Post log    RxNewThreadScheduler-1  1
Post log    RxNewThreadScheduler-2  4
Post log    RxNewThreadScheduler-3  9
Post log    RxNewThreadScheduler-4  16
apply()
Applying... 1
Applying... 4
Applying... 9
Applying... 16
Ending parallel executions
Results: 1
Results: 4
Results: 9
Results: 16
[END] TAKE-RUN


Process finished with exit code 0
a8496558 回答:等到每个外部线程上的RxJava有更多条件时-跨线程执行单个操作

这是我发现并测试的内容。 我不知道这是否是最佳解决方案,但对我来说现在正在运行。

我使用 BehaviorSubject 及其方法 blockingForEach 来观察 DocumentHashBucket (这是我创建的bean)。

通过这种方法, blockingForEach 允许我拥有一个阻塞代码,直到 behaviorSubject.onComplete()为止,该代码不会继续执行其余代码被调用。

package com.example.unit;

import com.example.DocumentHashBucket;
import io.reactivex.rxjava3.core.Observable;
import io.reactivex.rxjava3.functions.Action;
import io.reactivex.rxjava3.functions.Consumer;
import io.reactivex.rxjava3.functions.Function;
import io.reactivex.rxjava3.schedulers.Schedulers;
import io.reactivex.rxjava3.subjects.BehaviorSubject;
import org.apache.commons.codec.binary.Base64;
import org.junit.jupiter.api.Test;

import java.util.*;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Callable;
import java.util.concurrent.Executor;
import java.util.concurrent.LinkedBlockingQueue;

import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.fail;

public class UTestRxJavaStackOverflowSolution {

    @Test
    void uRxJavaSimple() {

        // Create an iterable observables
        List<Integer> calls = new LinkedList<>();
        calls.add(1);
        calls.add(2);
        calls.add(3);
        calls.add(4);

        // create the Observable that will be used for collect all hashes and store the signed version of each
        final DocumentHashBucket hashBucket = new DocumentHashBucket();
        hashBucket.setNumberDocuments(calls.size());
        final BehaviorSubject<DocumentHashBucket> behaviorSubject = BehaviorSubject.createDefault(hashBucket);

        // create the BlockingQueue in order to have a blocking point for the main thread
        final BlockingQueue<Runnable> tasks = new LinkedBlockingQueue<>();
        System.out.println("Starting parallel executions");

        // Create an iterable observables
        List<Observable<Integer>> observables = new LinkedList<>();
        for (final Integer i: calls) {
            System.out.println("Adding... "+i);
            observables.add(Observable.fromCallable(new Callable<Integer>() {
                @Override
                public Integer call(){
                    Integer res = signHash(behaviorSubject,i);
                    assertTrue(res==i*i,"The result must be the square of the variable");
                    return res;
                }
            }).subscribeOn(Schedulers.newThread())); // subscribeOn => specify the Scheduler on which an Observable will operate
        }

        final Map<String,String> mapResults = new HashMap<>();

        Observable.zip(observables,new Function<Object[],Object>() {
            @Override
            public Object apply(Object[] objects) throws Throwable { // Zip observables
                System.out.println("apply()");
                List<String> observables = new LinkedList<>();
                for (Object obj:objects) {
                    System.out.println("Applying... "+obj.toString());
                    observables.add(obj.toString());
                }
                return observables;
            }
        })
                .doOnNext(new Consumer<Object>() {
                    @Override
                    public void accept(Object results){
                        System.out.println("Ending parallel executions");
                    }
                })
                .doOnError(new Consumer<Throwable>() {
                    @Override
                    public void accept(Throwable throwable){
                        System.err.println("Error on execution of "+Thread.currentThread().getName()+" : "+ throwable.getMessage());
                        throwable.printStackTrace();
                    }
                })
                .observeOn(Schedulers.from(new Executor() {
                    @Override
                    public void execute(Runnable runnable) {
                        tasks.add(runnable);// Add a scheduler with executor from the current thread
                    }}))
                .subscribe(new Consumer<Object>() {
                               // The Subscribe operator is the glue that connects an observer to an Observable
                               @Override
                               public void accept(Object onNext) { // Subscribe to the result.
                                   // Put your code that needs to "wait"
                                   for (String x : (List<String>) onNext) {
                                       System.out.println("Results: " + x);
                                       mapResults.put(x,"OK");
                                   }
                               }
                           },new Consumer<Throwable>() {
                            // The Subscribe operator is the glue that connects an observer to an Observable
                            @Override
                            public void accept(Throwable onError) { // Subscribe to the result.
                                System.err.println("Error on execution in one thread detected on this (main) thread : " + onError.getMessage());
                                onError.printStackTrace();
                            }
                        },new Action() {
                            @Override
                            public void run(){
                                System.out.println("onComplete");
                            }
                        });

        System.out.println("[START] TAKE-RUN");
        try {
            tasks.take().run();
        } catch (InterruptedException e) {
            System.err.println("Error on execution of zip: "+e.getMessage());
            e.printStackTrace();
            fail("it's not possible that there is an exception");
        }
        System.out.println("[END] TAKE-RUN");

        assertTrue(mapResults.size()==4);
    }

    private Integer signHash(final BehaviorSubject<DocumentHashBucket> behaviorSubject,Integer number) {
        System.out.println(Thread.currentThread().getName()+" [START] doStuff()");
        try {
            Thread.sleep(number * 1000);
        } catch (Exception e) {e.printStackTrace();}
        System.out.println(Thread.currentThread().getName()+" [END] doStuff()");


        DocumentHashBucket hashBucket = behaviorSubject.getValue();
        byte[] numberStringInBytes = ("" + number).getBytes();
        String key = Base64.encodeBase64String(numberStringInBytes);
        hashBucket.getMap().put(key,numberStringInBytes);
        System.out.println(Thread.currentThread().getName()+" hashBucket.getMap().put new hash : "+key);

        behaviorSubject.blockingForEach(new Consumer<DocumentHashBucket>() {
            @Override
            public void accept(DocumentHashBucket documentHashBucket) throws Throwable {
                System.out.println(Thread.currentThread().getName()+" [blockingForEach] DocumentHashBucket is changed and now contains "+documentHashBucket.getMap().size()+" elements");
                synchronized (documentHashBucket){
                    if(documentHashBucket.getNumberDocuments()==documentHashBucket.getMap().size() && documentHashBucket.isCompletedStoreHashes()==false){

                        System.out.println(Thread.currentThread().getName()+"|> --------- all hashes arrived ---------");
                        // all hashes arrived
                        documentHashBucket.setCompletedStoreHashes(true);

                        System.out.println(Thread.currentThread().getName()+" [START] simulate signHashesOnInternet(..)");
                        try {
                            // simulate network call
                            Thread.sleep(10 * 1000);
                        } catch (Exception e) {e.printStackTrace();}
                        // simulate signing hash on the DocumentHashBucket
                        for(String key : documentHashBucket.getMap().keySet()){
                            int value = Integer.parseInt(new String(documentHashBucket.getMap().get(key)));
                            documentHashBucket.getMap().put(key,(""+(value*value)).getBytes());
                        }
                        System.out.println(Thread.currentThread().getName()+" [END] simulate signHashesOnInternet(..)");

                        // DocumentHashBucket - now hashes are signed
                        documentHashBucket.setCompletedStoreSignedHashes(true);

                        // unlock the blockingForEach
                        behaviorSubject.onComplete();

                        System.out.println(Thread.currentThread().getName()+"|> --------- all hashes SAVED ---------");
                    }else{
                        System.out.println(Thread.currentThread().getName()+"changed but hashes are: "+documentHashBucket.getMap().size());
                    }
                }// synchronized
            }
        });

        // check that the signing process is applied correctly
        assertTrue(Arrays.equals(hashBucket.getMap().get(key),(""+(number*number)).getBytes()));

        System.out.println(Thread.currentThread().getName()+" [START] doStuff2()");
        try {
            Thread.sleep(number * 1000);
        } catch (Exception e) {e.printStackTrace();}
        System.out.println(Thread.currentThread().getName()+" [END] doStuff2()");

        assertTrue(hashBucket.getMap().size()==4,"The map must contains 4 elements");
        assertTrue(hashBucket.isCompletedStoreHashes(),"The flag completedStoreHashes must be true");
        assertTrue(hashBucket.isCompletedStoreSignedHashes(),"The flag completedStoreSignedHashes must be true");

        return number*number;
    }
}

执行日志:

Starting parallel executions
Adding... 1
Adding... 2
Adding... 3
Adding... 4
RxNewThreadScheduler-1 [START] doStuff()
RxNewThreadScheduler-2 [START] doStuff()
RxNewThreadScheduler-3 [START] doStuff()
[START] TAKE-RUN
RxNewThreadScheduler-4 [START] doStuff()
RxNewThreadScheduler-1 [END] doStuff()
RxNewThreadScheduler-1 hashBucket.getMap().put new hash : MQ==
RxNewThreadScheduler-1 [blockingForEach] DocumentHashBucket is changed and now contains 1 elements
RxNewThreadScheduler-1changed but hashes are: 1
RxNewThreadScheduler-2 [END] doStuff()
RxNewThreadScheduler-2 hashBucket.getMap().put new hash : Mg==
RxNewThreadScheduler-2 [blockingForEach] DocumentHashBucket is changed and now contains 2 elements
RxNewThreadScheduler-2changed but hashes are: 2
RxNewThreadScheduler-3 [END] doStuff()
RxNewThreadScheduler-3 hashBucket.getMap().put new hash : Mw==
RxNewThreadScheduler-3 [blockingForEach] DocumentHashBucket is changed and now contains 3 elements
RxNewThreadScheduler-3changed but hashes are: 3
RxNewThreadScheduler-4 [END] doStuff()
RxNewThreadScheduler-4 hashBucket.getMap().put new hash : NA==
RxNewThreadScheduler-4 [blockingForEach] DocumentHashBucket is changed and now contains 4 elements
RxNewThreadScheduler-4|> --------- all hashes arrived ---------
RxNewThreadScheduler-4 [START] simulate signHashesOnInternet(..)
RxNewThreadScheduler-4 [END] simulate signHashesOnInternet(..)
RxNewThreadScheduler-4|> --------- all hashes SAVED ---------
RxNewThreadScheduler-2 [START] doStuff2()
RxNewThreadScheduler-1 [START] doStuff2()
RxNewThreadScheduler-4 [START] doStuff2()
RxNewThreadScheduler-3 [START] doStuff2()
RxNewThreadScheduler-1 [END] doStuff2()
RxNewThreadScheduler-2 [END] doStuff2()
RxNewThreadScheduler-3 [END] doStuff2()
RxNewThreadScheduler-4 [END] doStuff2()
apply()
Applying... 1
Applying... 4
Applying... 9
Applying... 16
Ending parallel executions
Results: 1
Results: 4
Results: 9
Results: 16
onComplete
[END] TAKE-RUN


Process finished with exit code 0
,

以下代码是可能的解决方案。

它将创建一个PublishSubject,它将消耗每个线程上doStuff()之后的无符号哈希。

然后,它通过转换PublishSubject来创建一个可观察对象,该对象将发出由API调用签名的哈希。

此已转换的可观察对象使用replay()进行共享,并且在signHash函数中被阻塞等待,因此signHash的所有调用将等待同一API调用的结果。 / p>

package com.example.unit;

import io.reactivex.Observable;
import io.reactivex.Single;
import io.reactivex.observables.ConnectableObservable;
import io.reactivex.schedulers.Schedulers;
import io.reactivex.subjects.PublishSubject;
import org.junit.Test;

import java.util.*;

import static junit.framework.TestCase.assertEquals;
import static junit.framework.TestCase.assertTrue;

public class UTestRxJava {

    @Test
    public void test() {

        List<Integer> calls = Arrays.asList(1,2,3,4);

        PublishSubject<Map.Entry<String,byte[]>> hashSubject = PublishSubject.create();

        ConnectableObservable<Map<String,byte[]>> connectable = hashSubject
                .toSerialized() // This is needed because we will post to the subject from different threads
                .take(calls.size()) // Wait for all unsigned hashes to arrive
                .toMap(Map.Entry::getKey,Map.Entry::getValue) // combine all unsigned hashes to a single map
                .flatMapObservable( // use the combined map to create an observable by...
                        unsignedHashes -> Observable.fromCallable(
                                () -> apiCall(unsignedHashes)) // calling the synchronous api with the map
                                .subscribeOn(Schedulers.io()))  // the api will be performed on the io() scheduler
                .replay(); // all subscribers will get the same api result

        connectable.connect();

        Map<Integer,Integer> result = Observable.fromArray(calls.toArray(new Integer[0]))
                .flatMap((Integer i) ->
                        Observable.fromCallable(() -> new AbstractMap.SimpleImmutableEntry<>(i,signHash(i,hashSubject,connectable.firstOrError())))
                                .subscribeOn(Schedulers.newThread()))
                .toMap(Map.Entry::getKey,Map.Entry::getValue)
                .blockingGet();

        assertEquals(4,result.size());
        System.out.println(Thread.currentThread().getName() + " Result list: " + Arrays.toString(result.entrySet().toArray()));
    }

    private Map<String,byte[]> apiCall(Map<String,byte[]> unsignedHashes) throws InterruptedException {
        System.out.println(Thread.currentThread().getName() + " Calling API with unsigned hashes: " + Arrays.toString(unsignedHashes.entrySet().toArray()));

        Map<String,byte[]> signedHashes = new HashMap<>();

        for (Map.Entry<String,byte[]> entry : unsignedHashes.entrySet()) {
            int value = Integer.parseInt(new String(entry.getValue()));
            signedHashes.put(entry.getKey(),("" + (value * value)).getBytes());
        }
        Thread.sleep(1000);
        return signedHashes;
    }

    private Integer signHash(Integer number,PublishSubject<Map.Entry<String,byte[]>> hashSubject,Single<Map<String,byte[]>> hashesSignedByApi) {
        System.out.println(Thread.currentThread().getName() + " [START] doStuff()");
        try {
            Thread.sleep(number * 1000);
        } catch (Exception e) {
            e.printStackTrace();
        }
        System.out.println(Thread.currentThread().getName() + " [END] doStuff()");

        byte[] numberStringInBytes = ("" + number).getBytes();
        String key = Base64.getEncoder().encodeToString(numberStringInBytes);

        hashSubject.onNext(new AbstractMap.SimpleImmutableEntry<>(key,numberStringInBytes));
        System.out.println(Thread.currentThread().getName() + " hashSubject.onNext with new hash : " + key);

        Map<String,byte[]> signedHashes = hashesSignedByApi.blockingGet();

        // check that the signing process is applied correctly
        assertTrue(Arrays.equals(signedHashes.get(key),("" + (number * number)).getBytes()));

        System.out.println(Thread.currentThread().getName() + " [START] doStuff2()");
        try {
            Thread.sleep(number * 1000);
        } catch (Exception e) {
            e.printStackTrace();
        }
        System.out.println(Thread.currentThread().getName() + " [END] doStuff2()");

        return Integer.valueOf(new String(signedHashes.get(key)));
    }
}
本文链接:https://www.f2er.com/3116453.html

大家都在问