通过终止ExecutorService可以解决的将来的问题

做一些基本的网络爬虫。

我想使用Completable future与多个线程并行运行抓取。每个作业都检索需要剪贴的Page对象,并返回带有已建立URL列表的Page对象。

列表中的每个URL(如果尚未提交以进行抓取)将启动新作业。完成所有并行作业之后,我要继续进行逻辑处理。

如果我删除“ allFutures.thenRun(()-> executorService.shutdown());”,则该代码只会刮掉第一页对象,然后终止的问题。然后它会收集所有页面/ URL,但程序永远不会结束。

public class Demo
{
    private final Set<Page> pages = new HashSet<>();
    private final Set<Page> submittedPages = new HashSet<>();

    private final ExecutorService executorService;

    public Demo(final int numberOfThreads)
    {
        this.executorService = Executors.newFixedThreadPool(numberOfThreads);
    }

    public void start(String url) throws ExecutionException,InterruptedException
    {
        this.submitTask(new Page(url));
        CompletableFuture<Void> allFutures = CompletableFuture.allOf(completableFutureList.toArray(new CompletableFuture[completableFutureList.size()]));
        allFutures.thenRun(() -> executorService.shutdown());

        // do something with pages
    }


    private void submitTask(final Page page)
    {
        if (!this.submittedPages.contains(page))
        {
            this.submittedPages.add(page);
            CompletableFuture<Void> cf = CompletableFuture.supplyAsync(() -> new Task(page).call(),this.executorService) //want to run this parallel in multiple threads
               .thenaccept(receivedPage -> {
                   this.savePage(receivedPage);
                   this.submitCollectedLinks(receivedPage);
               });
            completableFutureList.add(cf);

        }
    }

    private void submitCollectedLinks(final Page page){
        page.getLinks()
          .stream()
          .map(Page::new)
          .forEach(this::submitTask);
    }

    private void savePage(final Page page)
    {
        this.pages.add(page);
    }

}
jiayibeibao 回答:通过终止ExecutorService可以解决的将来的问题

您的代码有几个问题。您计划在completableFutureList的快照完成时关闭执行程序服务,这可能会在以后添加更多的期货,但是更糟糕的是,您甚至到达了// do something with pages时,快照尚未完成。

您没有显示completableFutureList的声明,但是考虑到pagessubmittedPages(它们是在不同线程中修改的)是用HashSet初始化的,即不是线程安全的,我对列表也不满意。但是您仍然不需要该列表。您应该更改提交代码,以返回表示正在组成的待处理任务和后续任务的期货。前提条件阶段完成后,传递给thenCompose的函数将得到评估,换句话说,这允许对函数链接时未知的期货进行依赖。

请注意,用线程安全的结构替换HashSet并不够。您必须避免在contains之前调用add之类的序列,因为这无法保证在这两个调用之间没有其他线程会执行add(称为“ check-then-act”反-图案)。您可以仅使用add,该元素将不执行任何操作,并且在元素已存在时返回false。使用正确的线程安全Set实现,它可以提供所需的原子性。

将这些东西放在一起,您将得到例如

public class Demo {
    private final Set<Page> pages = ConcurrentHashMap.newKeySet();
    private final Set<Page> submittedPages = ConcurrentHashMap.newKeySet();

    private final ExecutorService executorService;

    public Demo(final int numberOfThreads) {
        this.executorService = Executors.newFixedThreadPool(numberOfThreads);
    }

    public void start(String url) {
        this.submitTask(new Page(url))
            // shutdown even in the exceptional case
            .whenComplete((_void,throwable) -> executorService.shutdown())
            .join(); // wait for completion before doing something with pages

        // do something with pages
    }

    private CompletableFuture<Void> submitTask(final Page page) {
        // use a single add to avoid check-then-act anti-pattern
        if(this.submittedPages.add(page)) {
            return CompletableFuture.supplyAsync(new Task(page)::call,executorService)
                // compose with recursively encountered tasks
               .thenCompose(receivedPage -> {
                   this.savePage(receivedPage);
                   return this.submitCollectedLinks(receivedPage);
               });
        }

        // do nothing when already submitted
        return CompletableFuture.completedFuture(null);
    }

    private CompletableFuture<Void> submitCollectedLinks(final Page page) {
        return CompletableFuture.allOf(page.getLinks()
          .stream().map(Page::new).map(this::submitTask)
          .toArray(CompletableFuture<?>[]::new));
    }

    private void savePage(final Page page) {
        this.pages.add(page);
    }
}
本文链接:https://www.f2er.com/3049919.html

大家都在问