我最近一直在使用CompletableFuture,试图并行化持久的IO操作。返回之前,我必须等待所有事情完成,所以我都使用了
CompletableFuture.allOf().join()
和
stream().map(CompletableFuture::join)
强制执行此操作。但是,查看日志,我的印象是allOf()。join更快。我玩了一个测试看看。最后的输出始终显示流连接花费的时间更长。我看到的一件事是,如果我跳过CompleteableFuture.supplyAsync()中的System.out.println(),则差异会较小。
测试的典型输出如下:
streamJoin: 3196 ms,allof: 3055 ms
是allOf()。join(),然后以最快的速度收集,还是我的测试有缺陷?
package com.oyvind.completablefutures;
import org.junit.Test;
import java.time.LocalDateTime;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
public class CompletableFutureTest {
@Test
public void joinPerformanceTest() throws Exception{
// stream join
long startStreamJoin = System.currentTimeMillis();
List<Integer> result1 = listOfCompletableFutures("stream",Executors.newFixedThreadPool(100))
.stream()
.map(CompletableFuture::join)
.collect(Collectors.toList());// trigger execution
long spentTimeStreamJoin = System.currentTimeMillis() - startStreamJoin;
// allOf() join
long startAllOf = System.currentTimeMillis();
List<CompletableFuture<Integer>> completableFutures = listOfCompletableFutures("allOf",Executors.newFixedThreadPool(100));
CompletableFuture.allOf(completableFutures.toArray(new CompletableFuture[0])).join();
List<Integer> result2 = completableFutures.stream().map(CompletableFuture::join).collect(Collectors.toList());
long spentTimeAllOf = System.currentTimeMillis() - startAllOf;
log("streamJoin: %s ms,allof: %s ms",spentTimeStreamJoin,spentTimeAllOf);
}
private List<CompletableFuture<Integer>> listOfCompletableFutures(String name,Executor executor) {
return IntStream.range(1,1000)
.boxed()
.map(
i -> CompletableFuture.supplyAsync(
() -> logAndSleepFor1Second(name),executor
)
)
.collect(Collectors.toList());
}
private int logAndSleepFor1Second(String name) {
log("Starting %s: %s",name,Thread.currentThread().getName());
try {
Thread.sleep(300);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
return 1;
}
private void log(String format,Object... args) {
System.out.println(LocalDateTime.now() + ": " + String.format(format,args));
}
}