做一些基本的网络爬虫。
我想使用Completable future与多个线程并行运行抓取。每个作业都检索需要剪贴的Page对象,并返回带有已建立URL列表的Page对象。
列表中的每个URL(如果尚未提交以进行抓取)将启动新作业。完成所有并行作业之后,我要继续进行逻辑处理。
如果我删除“ allFutures.thenRun(()-> executorService.shutdown());”,则该代码只会刮掉第一页对象,然后终止的问题。然后它会收集所有页面/ URL,但程序永远不会结束。
public class Demo
{
private final Set<Page> pages = new HashSet<>();
private final Set<Page> submittedPages = new HashSet<>();
private final ExecutorService executorService;
public Demo(final int numberOfThreads)
{
this.executorService = Executors.newFixedThreadPool(numberOfThreads);
}
public void start(String url) throws ExecutionException,InterruptedException
{
this.submitTask(new Page(url));
CompletableFuture<Void> allFutures = CompletableFuture.allOf(completableFutureList.toArray(new CompletableFuture[completableFutureList.size()]));
allFutures.thenRun(() -> executorService.shutdown());
// do something with pages
}
private void submitTask(final Page page)
{
if (!this.submittedPages.contains(page))
{
this.submittedPages.add(page);
CompletableFuture<Void> cf = CompletableFuture.supplyAsync(() -> new Task(page).call(),this.executorService) //want to run this parallel in multiple threads
.thenaccept(receivedPage -> {
this.savePage(receivedPage);
this.submitCollectedLinks(receivedPage);
});
completableFutureList.add(cf);
}
}
private void submitCollectedLinks(final Page page){
page.getLinks()
.stream()
.map(Page::new)
.forEach(this::submitTask);
}
private void savePage(final Page page)
{
this.pages.add(page);
}
}