Spring Boot:我们如何实现多个@Scheduled任务,每个任务都有自己的线程池?

我想实现多个@Scheduled(具有固定延迟)任务,每个任务都有自己的线程池。

@Scheduled(fixedDelayString = "30000")
public void createOrderSchedule() {
    //create 10 orders concurrently; wait for all to be finished
    createOrder(10);
}

@Scheduled(fixedDelayString = "30000")
public void processOrderSchedule() {
    //process 10 orders concurrently; wait for all to be finished
}

@Scheduled(fixedDelayString = "30000")
public void notifySchedule() {
    //send notification for 10 orders concurrently; wait for all to be finished
}

我设法为每个调度程序创建了不同的ThreadPoolTaskExecutor,如下所示:

@Bean("orderPool")
public ThreadPoolTaskExecutor createOrderTaskExecutor() {
    ThreadPoolTaskExecutor pool = new ThreadPoolTaskExecutor();
    pool.setCorePoolSize(5);
    pool.setMaxPoolSize(10);
    pool.setThreadNamePrefix("order-thread-pool-");
    pool.setWaitForTasksToCompleteonShutdown(true);
    return pool;
}

..

我为每个任务提供了@Async

@Async("orderPool")
public void createOrder(Integer noOforders) {..}

和任务计划程序配置

@Bean
public ThreadPoolTaskScheduler threadPoolTaskScheduler() {
    ThreadPoolTaskScheduler threadPoolTaskScheduler = new ThreadPoolTaskScheduler();
    threadPoolTaskScheduler.setPoolSize(3);
    return threadPoolTaskScheduler;
}

我使用CompletableFuture.allOf(..).join();等待每个任务完成,但它阻塞了其他@Scheduled个任务。

总而言之,我想实现以下目标:

  1. 每个@Scheduled任务应独立运行,而不会阻止其他@Scheduled个任务。
  2. 每个@Scheduled任务都应该具有自己的线程池,以便它可以同时运行多个子任务(例如10)。
  3. 每个@Scheduled任务必须等待每个触发器完成而不会再次被调用。

我该如何实现?

nmqxcb 回答:Spring Boot:我们如何实现多个@Scheduled任务,每个任务都有自己的线程池?

经过将近18个小时的训练,我能够实现我在​​上述问题中提出的要求。很抱歉这么晚。

因此,流API提供了IntStream之类的接口,以并行地流传输元素。这导致我创建了n个并行订单。 (同时,进程k在不同的调度程序中以并行方式排序。依此类推。)

IntStream.range(0,inputIds.size())
        .parallel().forEach(index -> createOrder(inputIds.get(index)));

就这么简单。解决了1个用例。现在,我希望此调度程序具有自己的池。发现IntStream.parallel()使用ForkJoinPool,它是我们自己的ExecutorService的后继者,令我惊讶的是,spring为ForkJoinPool提供了一个预配置的工厂bean,即{{1 }}。因此,我创建了一个名为ForkJoinPoolFactoryBean的bean。

createOrderExecutor

我将这个bean自动连接到调度程序类中,并同时提交了所有订单,如下所示。

@Bean("createOrderExecutor")
public ForkJoinPoolFactoryBean createOrderExecutor() {
    ForkJoinPoolFactoryBean createOrderPoolFactoryBean = new ForkJoinPoolFactoryBean();
    createOrderPoolFactoryBean.setParallelism(10);
    createOrderPoolFactoryBean.setAsyncMode(true);
    createOrderPoolFactoryBean.setUncaughtExceptionHandler(null);
    createOrderPoolFactoryBean.setThreadFactory(p -> {
        final ForkJoinWorkerThread worker = ForkJoinPool.defaultForkJoinWorkerThreadFactory.newThread(p);
        worker.setName("create-order-pool-" + worker.getPoolIndex());
        return worker;
    });
    return createOrderPoolFactoryBean;
}

在那里。解决第二个用例。现在,这将不会等待所有并行任务完成,而只会异步触发它们。现在,createOrderExecutor.getObject().submit(() -> IntStream.range(0,inputIds.size()) .parallel().forEach(index -> createOrder(inputIds.get(index)))); (由ForkJoinTask返回)提供了一种submit()方法,该方法等待计算完成并返回结果。 (但是我不需要结果,我宁愿用get()包围它们。而且,我只等待完成。)

try-catch

这解决了我的最后一个用例。我为应用程序中的所有调度程序执行了此操作。

相信我,我尝试了在线上可用的@Scheduled(fixedDelayString = "5000") public void createOrderScheduler() { createOrderExecutor.getObject().submit(() -> IntStream.range(0,inputIds.size()) .parallel().forEach(index -> createOrder(inputIds.get(index)))).get(); } 的几乎所有实现,但无法实现所有这些功能。

本文链接:https://www.f2er.com/3119043.html

大家都在问