我想通过某种操作来提高后端REST API的性能,该操作可以按顺序轮询多个不同的外部API并收集它们的响应,并将它们平整化为一个响应列表。
我最近才了解beforeEach
,因此决定尝试一下,并将该解决方案与只将我的CompletableFuture
更改为stream
的解决方案进行比较。
这是用于基准测试的代码:
parallelStream
有8个(伪)API的列表。每个响应需要4秒钟的时间来执行,并返回4个实体的列表(为简单起见,我们为字符串)。
结果:
-
package com.alithya.platon; import java.util.Arrays; import java.util.List; import java.util.Objects; import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; public class ConcurrentTest { static final List<String> REST_APIS = Arrays.asList("api1","api2","api3","api4","api5","api6","api7","api8"); MyTestUtil myTest = new MyTestUtil(); long millisBefore; // used to benchmark @BeforeEach void setUp() { millisBefore = System.currentTimeMillis(); } @AfterEach void tearDown() { System.out.printf("time taken : %.4fs\n",(System.currentTimeMillis() - millisBefore) / 1000d); } @Test void parallelSolution() { // 4s var parallel = REST_APIS.parallelStream() .map(api -> myTest.collectOneRestCall()) .flatMap(List::stream) .collect(Collectors.toList()); System.out.println("List of responses: " + parallel.toString()); } @Test void futureSolution() throws Exception { // 8s var futures = myTest.collectAllResponsesAsync(REST_APIS); System.out.println("List of responses: " + futures.get()); // only blocks here } @Test void originalProblem() { // 32s var sequential = REST_APIS.stream() .map(api -> myTest.collectOneRestCall()) .flatMap(List::stream) .collect(Collectors.toList()); System.out.println("List of responses: " + sequential.toString()); } } class MyTestUtil { public static final List<String> RESULTS = Arrays.asList("1","2","3","4"); List<String> collectOneRestCall() { try { TimeUnit.SECONDS.sleep(4); // simulating the await of the response } catch (Exception io) { throw new RuntimeException(io); } finally { return MyTestUtil.RESULTS; // always return something,for this demonstration } } CompletableFuture<List<String>> collectAllResponsesAsync(List<String> restApiUrlList) { /* Collecting the list of all the async requests that build a List<String>. */ List<CompletableFuture<List<String>>> completableFutures = restApiUrlList.stream() .map(api -> nonBlockingRestCall()) .collect(Collectors.toList()); /* Creating a single Future that contains all the Futures we just created ("flatmap"). */ CompletableFuture<Void> allFutures = CompletableFuture.allOf(completableFutures .toArray(new CompletableFuture[restApiUrlList.size()])); /* When all the Futures have completed,we join them to create merged List<String>. */ CompletableFuture<List<String>> allCompletableFutures = allFutures .thenApply(future -> completableFutures.stream() .filter(Objects::nonNull) // we filter out the failed calls .map(CompletableFuture::join) .flatMap(List::stream) // creating a List<String> from List<List<String>> .collect(Collectors.toList()) ); return allCompletableFutures; } private CompletableFuture<List<String>> nonBlockingRestCall() { /* Manage the Exceptions here to ensure the wrapping Future returns the other calls. */ return CompletableFuture.supplyAsync(() -> collectOneRestCall()) .exceptionally(ex -> { return null; // gets managed in the wrapping Future }); } }
:32秒 -
stream
:4秒 -
parallelStream
:8秒
我很惊讶,并期望最后两个几乎相同。到底是什么造成了这种差异?据我所知,他们俩都使用CompletableFuture
。
我的天真的解释是ForkJoinPool.commonPool()
(因为它是阻塞操作)将实际的parallelStream
用于其工作负载,因此与{{1 }}是异步的,因此无法使用该MainThread
。