加入CompletableFutures

我最近一直在使用CompletableFuture,试图并行化持久的IO操作。返回之前,我必须等待所有事情完成,所以我都使用了

CompletableFuture.allOf().join()stream().map(CompletableFuture::join)

强制执行此操作。但是,查看日志,我的印象是allOf()。join更快。我玩了一个测试看看。最后的输出始终显示流连接花费的时间更长。我看到的一件事是,如果我跳过CompleteableFuture.supplyAsync()中的System.out.println(),则差异会较小。

测试的典型输出如下: streamJoin: 3196 ms,allof: 3055 ms

是allOf()。join(),然后以最快的速度收集,还是我的测试有缺陷?

package com.oyvind.completablefutures;

import org.junit.Test;

import java.time.LocalDateTime;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.stream.Collectors;
import java.util.stream.IntStream;

public class CompletableFutureTest {

    @Test
    public void joinPerformanceTest() throws Exception{

        // stream join
        long startStreamJoin = System.currentTimeMillis();
        List<Integer> result1 = listOfCompletableFutures("stream",Executors.newFixedThreadPool(100))
                .stream()
                .map(CompletableFuture::join)
                .collect(Collectors.toList());// trigger execution
        long spentTimeStreamJoin = System.currentTimeMillis() - startStreamJoin;

        // allOf() join
        long startAllOf = System.currentTimeMillis();
        List<CompletableFuture<Integer>> completableFutures = listOfCompletableFutures("allOf",Executors.newFixedThreadPool(100));
        CompletableFuture.allOf(completableFutures.toArray(new CompletableFuture[0])).join();
        List<Integer> result2 = completableFutures.stream().map(CompletableFuture::join).collect(Collectors.toList());
        long spentTimeAllOf = System.currentTimeMillis() - startAllOf;

        log("streamJoin: %s ms,allof: %s ms",spentTimeStreamJoin,spentTimeAllOf);
    }

    private List<CompletableFuture<Integer>> listOfCompletableFutures(String name,Executor executor) {

        return IntStream.range(1,1000)
                .boxed()
                .map(
                        i -> CompletableFuture.supplyAsync(
                                () -> logAndSleepFor1Second(name),executor
                        )
                )
                .collect(Collectors.toList());
    }

    private int logAndSleepFor1Second(String name) {
        log("Starting %s: %s",name,Thread.currentThread().getName());
        try {
            Thread.sleep(300);
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }

        return 1;
    }

    private void log(String format,Object... args) {
        System.out.println(LocalDateTime.now() + ": " + String.format(format,args));
    }
}
bluemoon88888 回答:加入CompletableFutures

暂时没有好的解决方案,如果你有好的解决方案,请发邮件至:iooj@foxmail.com
本文链接:https://www.f2er.com/2938108.html

大家都在问