如何在Java 8中创建阻止后台加载程序?

问题

如何在Java 8中创建合适的后台加载程序?条件:

  • 数据应在后台加载
  • 加载后应显示数据
  • 在加载数据时,不应再接受其他请求
  • 如果在加载数据时有请求,则应在一定的超时时间(例如5秒)后安排另一次加载

目的是为了G。已经接受了重新加载请求,但数据库中没有大量请求。

MCVE

这是MCVE。它由一个后台任务组成,该任务通过简单地调用Thread.sleep 2秒钟来模拟加载。每秒安排一次任务,这自然会导致后台加载任务重叠,因此应避免。

public class LoadInBackgroundExample {

  /**
   * A simple background task which should perform the data loading operation. In this minimal example it simply invokes Thread.sleep
   */
  public static class BackgroundTask implements Runnable {

    private int id;

    public BackgroundTask(int id) {
      this.id = id;
    }

    /**
     * Sleep for a given amount of time to simulate loading.
     */
    @Override
    public void run() {

      try {

        System.out.println("Start #" + id + ": " + Thread.currentThread());

        long sleepTime = 2000; 
        Thread.sleep( sleepTime);

      } catch (InterruptedException e) {
        e.printStackTrace();
      } finally {
        System.out.println("Finish #" + id + ": " + Thread.currentThread());
      }

    }
  }

  /**
   * CompletableFuture which simulates loading and showing data.
   * @param taskId Identifier of the current task
   */
  public static void loadInBackground( int taskId) {

    // create the loading task
    BackgroundTask backgroundTask = new BackgroundTask( taskId);

    // "load" the data asynchronously
    CompletableFuture<String> completableFuture = CompletableFuture.supplyAsync(new Supplier<String>() {

      @Override
      public String get() {

        CompletableFuture<Void> future = CompletableFuture.runAsync(backgroundTask);

        try {

          future.get();

        } catch (InterruptedException | ExecutionException e) {
          e.printStackTrace();
        }

        return "task " + backgroundTask.id;
      }
    });

    // display the data after they are loaded
    CompletableFuture<Void> future = completableFuture.thenaccept(x -> {

      System.out.println( "Background task finished:" + x);

    });

  }


  public static void main(String[] args) {

    // runnable which invokes the background loader every second
    Runnable trigger = new Runnable() {

      int taskId = 0;

      public void run() { 

        loadInBackground( taskId++);

      }
    };

    // create scheduler
    ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);
    ScheduledFuture<?> beeperHandle = scheduler.scheduleAtFixedRate(trigger,1,TimeUnit.SECONDS);

    // cancel the scheudler and the application after 10 seconds
    scheduler.schedule(() -> beeperHandle.cancel(true),10,TimeUnit.SECONDS);

    try {
      beeperHandle.get();
    } catch (Throwable th) {
    }

    System.out.println( "Cancelled");
    system.exit(0);
  }

}

输出是这样的:

Start #0: Thread[ForkJoinPool.commonPool-worker-2,5,main]
Start #1: Thread[ForkJoinPool.commonPool-worker-4,main]
Start #2: Thread[ForkJoinPool.commonPool-worker-6,main]
Finish #0: Thread[ForkJoinPool.commonPool-worker-2,main]
Background task finished:task 0
Finish #1: Thread[ForkJoinPool.commonPool-worker-4,main]
Background task finished:task 1
Start #3: Thread[ForkJoinPool.commonPool-worker-4,main]
Finish #2: Thread[ForkJoinPool.commonPool-worker-6,main]
Background task finished:task 2
Start #4: Thread[ForkJoinPool.commonPool-worker-6,main]
Start #5: Thread[ForkJoinPool.commonPool-worker-2,main]
Finish #3: Thread[ForkJoinPool.commonPool-worker-4,main]
Background task finished:task 3
Start #6: Thread[ForkJoinPool.commonPool-worker-4,main]
Finish #4: Thread[ForkJoinPool.commonPool-worker-6,main]
Background task finished:task 4
Finish #5: Thread[ForkJoinPool.commonPool-worker-2,main]
Background task finished:task 5
Start #7: Thread[ForkJoinPool.commonPool-worker-2,main]
Finish #6: Thread[ForkJoinPool.commonPool-worker-4,main]
Start #8: Thread[ForkJoinPool.commonPool-worker-6,main]
Background task finished:task 6
Start #9: Thread[ForkJoinPool.commonPool-worker-4,main]
Finish #7: Thread[ForkJoinPool.commonPool-worker-2,main]
Background task finished:task 7
Start #10: Thread[ForkJoinPool.commonPool-worker-2,main]
Finish #8: Thread[ForkJoinPool.commonPool-worker-6,main]
Background task finished:task 8
Cancelled

目标是拥有e。 G。 #1和#2被跳过,因为#0仍在运行。

问题

您在哪里正确设置了阻止机制?应该使用同步吗?还是一些AtomicBoolean?如果是这样,它应该在get()方法内部还是其他地方?

Lephia 回答:如何在Java 8中创建阻止后台加载程序?

您已经有一个线程池来执行任务。 在另一个使用ForkJoinPool的异步执行器中运行任务不一定会很复杂

简单点:

CompletableFuture

ScheduledExecutorService将确保您每次使用scheduleAtFixedRate调用它时仅运行一个任务

  

创建并执行一个周期性操作,该操作在给定的初始延迟后首先启用,然后在给定的时间段内启用;也就是说执行将在initialDelay,initialDelay + period,initialDelay + 2 * period等之后开始。如果该任务的任何执行遇到异常,则将禁止后续执行。否则,任务将仅通过取消或终止执行程序而终止。 如果此任务的执行时间超过其执行时间,则后续的执行可能会延迟执行,但不会同时执行

,

满足以下条件:

  
      
  • 数据应在后台加载
  •   
  • 加载后应显示数据
  •   
  • 在加载数据时,不应再接受其他请求
  •   
  • 如果在加载数据时有请求,则应在一定的超时时间(例如5秒)后安排另一次加载
  •   

可以基于Executors.newSingleThreadExecutor()CompletableFutureLinkedBlockingQueue构建解决方案:

public class SingleThreadedLoader {

  private static class BackgroundTask extends CompletableFuture<String> {

    private final String query;

    private BackgroundTask(String query) {
      this.query = query;
    }

    public String getQuery() {
      return query;
    }
  }

  private final BlockingQueue<BackgroundTask> tasks = new LinkedBlockingQueue<>();
  // while data are loaded no further requests should be accepted
  private final Executor executor = Executors.newSingleThreadExecutor();

  private final int delaySeconds;

  private AtomicReference<Instant> lastExecution = new AtomicReference<>(Instant.EPOCH);

  public SingleThreadedLoader(int delaySeconds) {
    this.delaySeconds = delaySeconds;
    setupLoading();
  }

  public BackgroundTask loadInBackground(String query) {
    log("Enqueued query " + query);
    BackgroundTask task = new BackgroundTask(query);
    tasks.add(task);
    return task;
  }

  private void setupLoading() {
    // data should be loaded in background
    executor.execute(() -> {
      while (true) {
        try {
          // if there were requests while the data were loaded
          // another loading should be scheduled after a certain timeout (e. g. 5 seconds)
          Instant prev = lastExecution.get();
          long delay = Duration.between(prev,Instant.now()).toSeconds();
          if (delay < delaySeconds) {
            log("Waiting for 5 seconds before next data loading");
            TimeUnit.SECONDS.sleep(delaySeconds - delay);
          }
          BackgroundTask task = tasks.take();
          try {
            String query = task.getQuery();
            String data = loadData(query);
            task.complete(data);
          } catch (Exception e) {
            task.completeExceptionally(e);
          }
          lastExecution.set(Instant.now());
        } catch (InterruptedException e) {
          log(e.getMessage());
          return;
        }
      }
    });
  }

  private String loadData(String query) {
    try {
      log("Loading data for " + query);
      TimeUnit.SECONDS.sleep(2);
      log("Loaded data for " + query);
      return "Result " + query;
    } catch (InterruptedException e) {
      throw new RuntimeException(e);
    }
  }

  private static void log(String str) {
    String time = LocalTime.now().truncatedTo(ChronoUnit.SECONDS).format(DateTimeFormatter.ISO_TIME);
    String thread = Thread.currentThread().getName();
    System.out.println(time + ' ' + thread + ": " + str);
  }

  public static void main(String[] args) throws Exception {
    SingleThreadedLoader loader = new SingleThreadedLoader(5);
    // after the loading the data should be displayed
    loader.loadInBackground("1").thenAccept(SingleThreadedLoader::log);
    loader.loadInBackground("2").thenAccept(SingleThreadedLoader::log);
    loader.loadInBackground("3").thenAccept(SingleThreadedLoader::log);

    log("Do another work in the main thread");

    TimeUnit.SECONDS.sleep(30);
  }
}

执行后,标准输出将具有以下输出:

10:29:26 main: Enqueued query 1
10:29:26 pool-1-thread-1: Loading data for 1
10:29:26 main: Enqueued query 2
10:29:26 main: Enqueued query 3
10:29:26 main: Do another work in the main thread
10:29:28 pool-1-thread-1: Loaded data for 1
10:29:28 pool-1-thread-1: Result 1
10:29:28 pool-1-thread-1: Waiting for 5 seconds before next data loading
10:29:33 pool-1-thread-1: Loading data for 2
10:29:36 pool-1-thread-1: Loaded data for 2
10:29:36 pool-1-thread-1: Result 2
10:29:36 pool-1-thread-1: Waiting for 5 seconds before next data loading
10:29:41 pool-1-thread-1: Loading data for 3
10:29:43 pool-1-thread-1: Loaded data for 3
10:29:43 pool-1-thread-1: Result 3
,

我添加了一个AtomicInteger,该计数器将用作运行具有简单lock()和unlock()方法的任务的计数器,并将此微小更改更改为您得到的原始代码:

Start #0: Thread[ForkJoinPool.commonPool-worker-2,5,main]
background task cancelled  1
background task cancelled  2
Finish #0: Thread[ForkJoinPool.commonPool-worker-2,main]
Background task finished:task 0
Start #3: Thread[ForkJoinPool.commonPool-worker-2,main]
background task cancelled  4
Finish #3: Thread[ForkJoinPool.commonPool-worker-2,main]
background task cancelled  5
Background task finished:task 3
Start #6: Thread[ForkJoinPool.commonPool-worker-3,main]
background task cancelled  7
Finish #6: Thread[ForkJoinPool.commonPool-worker-3,main]
background task cancelled  8
Background task finished:task 6
Start #9: Thread[ForkJoinPool.commonPool-worker-2,main]
background task cancelled  10
Cancelled

这是我为您完成的任务的解决方案:

public class LoadInBackgroundExample {
    //Added new exception
    public static class AlreadyIsRunningException extends RuntimeException {
        long taskId;

        public AlreadyIsRunningException(String message,long taskId) {
            super(message);
            this.taskId = taskId;
        }

        public long getTaskId() {
            return taskId;
        }

        public void setTaskId(long taskId) {
            this.taskId = taskId;
        }
    }


    /**
     * A simple background task which should perform the data loading operation. In this minimal example it simply invokes Thread.sleep
     */
    public static class BackgroundTask implements Runnable {


        //this atomicInteger acts as a global lock counter for BackgroundTask objects
        private static AtomicInteger counter = new AtomicInteger(0);


        private int id;

        public BackgroundTask(int id) {
            this.id = id;
        }

        private void unlock() {
            counter.decrementAndGet();
        }

        private void lock() {
            //we need to check this way to avoid some unlucky timing between threads
            int lockValue = counter.incrementAndGet();
            //if we got counter different than 1 that means that some other task is already running (it has already acquired the lock)
            if (lockValue != 1) {
                //rollback our check
                counter.decrementAndGet();
                //throw an exception
                throw new AlreadyIsRunningException("Some other task already is running",id);
            }
        }

        /**
         * Sleep for a given amount of time to simulate loading.
         */
        @Override
        public void run() {
            //Check if we can acquire lock

            lock();
            //we have a lock to
            try {
                System.out.println("Start #" + id + ": " + Thread.currentThread());
                long sleepTime = 2000;
                Thread.sleep(sleepTime);
            } catch (InterruptedException e) {
                e.printStackTrace();
            } finally {
                System.out.println("Finish #" + id + ": " + Thread.currentThread());
                unlock();
            }
        }
    }

    /**
     * CompletableFuture which simulates loading and showing data.
     *
     * @param taskId Identifier of the current task
     */


    public static void loadInBackground(int taskId) {
        // create the loading task
        BackgroundTask backgroundTask = new BackgroundTask(taskId);
        // "load" the data asynchronously
        CompletableFuture<String> completableFuture = CompletableFuture.supplyAsync(new Supplier<String>() {
            @Override
            public String get() {
                CompletableFuture<Void> future = CompletableFuture.runAsync(backgroundTask);

                try {
                    future.get();
                } catch (ExecutionException e) {
                    if (e.getCause() instanceof AlreadyIsRunningException) {
                        System.out.println("background task cancelled  " + ((AlreadyIsRunningException) e.getCause()).getTaskId());
                        throw (AlreadyIsRunningException) e.getCause();
                    }
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                return "task " + backgroundTask.id;
            }
        });
        // display the data after they are loaded
        CompletableFuture<Void> future = completableFuture.thenAccept(x -> {
            System.out.println("Background task finished:" + x);
        });
    }

    ArrayList<BackgroundTask> backgroundTasks = new ArrayList<>();


    public static void main(String[] args) {
        // runnable which invokes the background loader every second
        Runnable trigger = new Runnable() {
            int taskId = 0;

            public void run() {
                loadInBackground(taskId++);
            }
        };

        // create scheduler
        ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);
        ScheduledFuture<?> beeperHandle = scheduler.scheduleAtFixedRate(trigger,1,TimeUnit.SECONDS);

        // cancel the scheudler and the application after 10 seconds
        scheduler.schedule(() -> beeperHandle.cancel(true),10,TimeUnit.SECONDS);

        try {
            beeperHandle.get();
        } catch (Throwable th) {
        }

        System.out.println("Cancelled");
        System.exit(0);
    }

更新

我将lock()和unlock()方法更改为更简单的形式:

private static  AtomicBoolean atomicBoolean = new AtomicBoolean(false);
        private void unlock() {
            atomicBoolean.set(false);
        }
        private void lock() {
            //if 'changed' is false that means some other task is already running
            boolean changed = atomicBoolean.compareAndSet(false,true);
            if (!changed) {
                throw new AlreadyIsRunningException("Some other task  is already running",id);
            }
        }
,

如果了解,您会同时将多个任务置于后台。由于这些任务执行的是完全相同的工作,因此您不想并行执行它们,因此您需要一项任务来完成工作并将其结果分享给其他人。因此,如果同时获得10个CompletableFuture,则希望其中一个调用'reload'到db中,并以所有CompletableFuture可以正常完成结果的方式与他人共享执行结果。我认为是

  

目标是拥有e。 G。 #1和#2被跳过,因为#0仍然存在   运行。

  

加载后应显示数据

如果我的猜测正确,则可以尝试我的解决方案。

我在任务之间有某种亲子关系。父母的任务是真正完成工作的任务,并且将成果分享给孩子。子任务是在父任务仍在执行时添加的任务,子任务等待直到父任务完成其执行。由于父母任务的结果仍然是“新鲜的”,因此它们被复制到每个孩子中,并且所有孩子都完成了自己的未来。

public class BackgroundService {


    public static class BackgroundJob implements Callable<String> {

        private static BackgroundJob ROOT_JOB = null;

        private synchronized static void addBackgroundJob(BackgroundJob backgroundJob) {
            if (ROOT_JOB != null) {
                ROOT_JOB.addChild(backgroundJob);
            } else {
                System.out.println();
                System.out.println(Thread.currentThread().getName() + " RUNNING ROOT TASK-" + backgroundJob.jobId);
                ROOT_JOB = backgroundJob;
            }
        }

        private synchronized static void unlock() {
            ROOT_JOB = null;
        }

        private final int jobId;
        private List<BackgroundJob> children = new ArrayList<>();
        private BackgroundJob parent;
        private String providedResultFromParent = null;


        public BackgroundJob(int jobId) {
            this.jobId = jobId;
        }

        private  void addChild(BackgroundJob backgroundJob) {
            backgroundJob.parent = this;
            this.children.add(backgroundJob);
        }

        @Override
        public String call() throws Exception {
            addBackgroundJob(this);
            if (parent == null) {
                String result = logic();

                synchronized (ROOT_JOB) {
                    for (final BackgroundJob backgroundJob : children) {
                        backgroundJob.providedResultFromParent = result;
                        synchronized (backgroundJob) {
                            backgroundJob.notify();
                        }
                    }
                    unlock();
                }

                return "\t\tROOT task" + jobId + "'s " + result;
            } else {
                synchronized (this) {
                    System.out.println(Thread.currentThread().getName() + "\t\tskipping task-" + jobId + " and waiting running task-" + parent.jobId + " to finish");
                    this.wait();
                }
                return "\t\t\t\ttask-" + jobId + "'s " + providedResultFromParent;
            }
        }

        private String logic() throws InterruptedException {
            Thread.sleep(2000);
            return (int) (Math.random() * 1000) + " ";
        }
    }

    public static void main(String[] args) throws InterruptedException,ExecutionException {
        AtomicInteger atomicInteger = new AtomicInteger();
        ExecutorService pool = Executors.newCachedThreadPool();
        Supplier<String> job = () -> {
            int taskId = atomicInteger.incrementAndGet();
            BackgroundJob backgroundJob = new BackgroundJob(taskId);
            try {
                return backgroundJob.call();
            } catch (Exception e) {
                e.printStackTrace();
            }
            return "finished " + taskId;
        };

        for (int i = 100; i > 0; i--) {
            CompletableFuture.supplyAsync(job,pool).thenAccept(s -> System.out.println(Thread.currentThread().getName()+" "+ s + " result is readable"));
            Thread.sleep((long) (Math.random() * 500));
        }

        pool.awaitTermination(Long.MAX_VALUE,TimeUnit.DAYS);
        pool.shutdown();
    }

这是输出:

pool-1-thread-1 RUNNING ROOT TASK-1
pool-1-thread-2     skipping task-2 and waiting running task-1 to finish
pool-1-thread-3     skipping task-3 and waiting running task-1 to finish
pool-1-thread-4     skipping task-4 and waiting running task-1 to finish
pool-1-thread-5     skipping task-5 and waiting running task-1 to finish
pool-1-thread-6     skipping task-6 and waiting running task-1 to finish
pool-1-thread-7     skipping task-7 and waiting running task-1 to finish
pool-1-thread-8     skipping task-8 and waiting running task-1 to finish
pool-1-thread-3                 task-3's 165  result is readable
pool-1-thread-8                 task-8's 165  result is readable
pool-1-thread-6                 task-6's 165  result is readable
pool-1-thread-5                 task-5's 165  result is readable
pool-1-thread-7                 task-7's 165  result is readable
pool-1-thread-2                 task-2's 165  result is readable
pool-1-thread-1         ROOT task1's 165  result is readable
pool-1-thread-4                 task-4's 165  result is readable

pool-1-thread-4 RUNNING ROOT TASK-9
pool-1-thread-1     skipping task-10 and waiting running task-9 to finish
pool-1-thread-2     skipping task-11 and waiting running task-9 to finish
pool-1-thread-7     skipping task-12 and waiting running task-9 to finish
pool-1-thread-5     skipping task-13 and waiting running task-9 to finish
pool-1-thread-8     skipping task-14 and waiting running task-9 to finish
pool-1-thread-6     skipping task-15 and waiting running task-9 to finish
pool-1-thread-3     skipping task-16 and waiting running task-9 to finish
pool-1-thread-9     skipping task-17 and waiting running task-9 to finish
pool-1-thread-10        skipping task-18 and waiting running task-9 to finish
pool-1-thread-1                 task-10's 370  result is readable
pool-1-thread-10                task-18's 370  result is readable
pool-1-thread-4         ROOT task9's 370  result is readable
pool-1-thread-9                 task-17's 370  result is readable
pool-1-thread-7                 task-12's 370  result is readable
pool-1-thread-6                 task-15's 370  result is readable
pool-1-thread-8                 task-14's 370  result is readable
pool-1-thread-2                 task-11's 370  result is readable
pool-1-thread-3                 task-16's 370  result is readable
pool-1-thread-5                 task-13's 370  result is readable

pool-1-thread-5 RUNNING ROOT TASK-19
pool-1-thread-3     skipping task-20 and waiting running task-19 to finish
pool-1-thread-2     skipping task-21 and waiting running task-19 to finish
pool-1-thread-8     skipping task-22 and waiting running task-19 to finish
pool-1-thread-6     skipping task-23 and waiting running task-19 to finish
pool-1-thread-7     skipping task-24 and waiting running task-19 to finish
pool-1-thread-9     skipping task-25 and waiting running task-19 to finish
pool-1-thread-4     skipping task-26 and waiting running task-19 to finish
pool-1-thread-10        skipping task-27 and waiting running task-19 to finish
pool-1-thread-1     skipping task-28 and waiting running task-19 to finish
pool-1-thread-5         ROOT task19's 574  result is readable
pool-1-thread-8                 task-22's 574  result is readable
pool-1-thread-4                 task-26's 574  result is readable
pool-1-thread-7                 task-24's 574  result is readable
pool-1-thread-6                 task-23's 574  result is readable
pool-1-thread-3                 task-20's 574  result is readable
pool-1-thread-9                 task-25's 574  result is readable
pool-1-thread-2                 task-21's 574  result is readable
pool-1-thread-1                 task-28's 574  result is readable
pool-1-thread-10                task-27's 574  result is readable

pool-1-thread-10 RUNNING ROOT TASK-29
pool-1-thread-1     skipping task-30 and waiting running task-29 to finish
pool-1-thread-2     skipping task-31 and waiting running task-29 to finish
pool-1-thread-9     skipping task-32 and waiting running task-29 to finish
pool-1-thread-3     skipping task-33 and waiting running task-29 to finish
pool-1-thread-6     skipping task-34 and waiting running task-29 to finish
pool-1-thread-7     skipping task-35 and waiting running task-29 to finish
pool-1-thread-4     skipping task-36 and waiting running task-29 to finish
pool-1-thread-8     skipping task-37 and waiting running task-29 to finish
pool-1-thread-5     skipping task-38 and waiting running task-29 to finish
pool-1-thread-11        skipping task-39 and waiting running task-29 to finish
pool-1-thread-1                 task-30's 230  result is readable
pool-1-thread-11                task-39's 230  result is readable
pool-1-thread-8                 task-37's 230  result is readable
pool-1-thread-5                 task-38's 230  result is readable
pool-1-thread-4                 task-36's 230  result is readable
pool-1-thread-7                 task-35's 230  result is readable

pool-1-thread-12 RUNNING ROOT TASK-40
pool-1-thread-6                 task-34's 230  result is readable
pool-1-thread-10        ROOT task29's 230  result is readable
pool-1-thread-3                 task-33's 230  result is readable
pool-1-thread-9                 task-32's 230  result is readable
pool-1-thread-2                 task-31's 230  result is readable
pool-1-thread-2     skipping task-41 and waiting running task-40 to finish
pool-1-thread-9     skipping task-42 and waiting running task-40 to finish
pool-1-thread-3     skipping task-43 and waiting running task-40 to finish
pool-1-thread-10        skipping task-44 and waiting running task-40 to finish
pool-1-thread-6     skipping task-45 and waiting running task-40 to finish
pool-1-thread-7     skipping task-46 and waiting running task-40 to finish
pool-1-thread-2                 task-41's 282  result is readable
pool-1-thread-10                task-44's 282  result is readable
pool-1-thread-6                 task-45's 282  result is readable
pool-1-thread-7                 task-46's 282  result is readable
pool-1-thread-3                 task-43's 282  result is readable
pool-1-thread-9                 task-42's 282  result is readable
pool-1-thread-12        ROOT task40's 282  result is readable

pool-1-thread-12 RUNNING ROOT TASK-47
pool-1-thread-9     skipping task-48 and waiting running task-47 to finish
pool-1-thread-3     skipping task-49 and waiting running task-47 to finish
pool-1-thread-7     skipping task-50 and waiting running task-47 to finish
pool-1-thread-6     skipping task-51 and waiting running task-47 to finish
pool-1-thread-10        skipping task-52 and waiting running task-47 to finish
pool-1-thread-2     skipping task-53 and waiting running task-47 to finish
pool-1-thread-12        ROOT task47's 871  result is readable
pool-1-thread-10                task-52's 871  result is readable
pool-1-thread-2                 task-53's 871  result is readable
pool-1-thread-3                 task-49's 871  result is readable
pool-1-thread-6                 task-51's 871  result is readable
pool-1-thread-7                 task-50's 871  result is readable
pool-1-thread-9                 task-48's 871  result is readable

pool-1-thread-9 RUNNING ROOT TASK-54
pool-1-thread-7     skipping task-55 and waiting running task-54 to finish
pool-1-thread-6     skipping task-56 and waiting running task-54 to finish
pool-1-thread-3     skipping task-57 and waiting running task-54 to finish
pool-1-thread-2     skipping task-58 and waiting running task-54 to finish
pool-1-thread-10        skipping task-59 and waiting running task-54 to finish
pool-1-thread-12        skipping task-60 and waiting running task-54 to finish
pool-1-thread-4     skipping task-61 and waiting running task-54 to finish
pool-1-thread-5     skipping task-62 and waiting running task-54 to finish
pool-1-thread-9         ROOT task54's 345  result is readable
pool-1-thread-2                 task-58's 345  result is readable
pool-1-thread-5                 task-62's 345  result is readable
pool-1-thread-7                 task-55's 345  result is readable
pool-1-thread-10                task-59's 345  result is readable
pool-1-thread-6                 task-56's 345  result is readable
pool-1-thread-3                 task-57's 345  result is readable
pool-1-thread-4                 task-61's 345  result is readable
pool-1-thread-12                task-60's 345  result is readable

pool-1-thread-12 RUNNING ROOT TASK-63
pool-1-thread-4     skipping task-64 and waiting running task-63 to finish
pool-1-thread-3     skipping task-65 and waiting running task-63 to finish
pool-1-thread-6     skipping task-66 and waiting running task-63 to finish
pool-1-thread-10        skipping task-67 and waiting running task-63 to finish
pool-1-thread-7     skipping task-68 and waiting running task-63 to finish
pool-1-thread-5     skipping task-69 and waiting running task-63 to finish
pool-1-thread-2     skipping task-70 and waiting running task-63 to finish
pool-1-thread-12        ROOT task63's 670  result is readable
pool-1-thread-2                 task-70's 670  result is readable
pool-1-thread-5                 task-69's 670  result is readable
pool-1-thread-7                 task-68's 670  result is readable
pool-1-thread-10                task-67's 670  result is readable
pool-1-thread-6                 task-66's 670  result is readable
pool-1-thread-3                 task-65's 670  result is readable
pool-1-thread-4                 task-64's 670  result is readable

pool-1-thread-4 RUNNING ROOT TASK-71
pool-1-thread-3     skipping task-72 and waiting running task-71 to finish
pool-1-thread-6     skipping task-73 and waiting running task-71 to finish
pool-1-thread-10        skipping task-74 and waiting running task-71 to finish
pool-1-thread-7     skipping task-75 and waiting running task-71 to finish
pool-1-thread-5     skipping task-76 and waiting running task-71 to finish
pool-1-thread-2     skipping task-77 and waiting running task-71 to finish
pool-1-thread-12        skipping task-78 and waiting running task-71 to finish
pool-1-thread-9     skipping task-79 and waiting running task-71 to finish
pool-1-thread-8     skipping task-80 and waiting running task-71 to finish
pool-1-thread-4         ROOT task71's 445  result is readable
pool-1-thread-6                 task-73's 445  result is readable
pool-1-thread-9                 task-79's 445  result is readable
pool-1-thread-3                 task-72's 445  result is readable
pool-1-thread-8                 task-80's 445  result is readable
pool-1-thread-12                task-78's 445  result is readable
pool-1-thread-5                 task-76's 445  result is readable
pool-1-thread-10                task-74's 445  result is readable
pool-1-thread-2                 task-77's 445  result is readable
pool-1-thread-7                 task-75's 445  result is readable

pool-1-thread-7 RUNNING ROOT TASK-81
pool-1-thread-2     skipping task-82 and waiting running task-81 to finish
pool-1-thread-10        skipping task-83 and waiting running task-81 to finish
pool-1-thread-5     skipping task-84 and waiting running task-81 to finish
pool-1-thread-12        skipping task-85 and waiting running task-81 to finish
pool-1-thread-8     skipping task-86 and waiting running task-81 to finish
pool-1-thread-3     skipping task-87 and waiting running task-81 to finish
pool-1-thread-9     skipping task-88 and waiting running task-81 to finish
pool-1-thread-6     skipping task-89 and waiting running task-81 to finish
pool-1-thread-7         ROOT task81's 141  result is readable
pool-1-thread-6                 task-89's 141  result is readable
pool-1-thread-9                 task-88's 141  result is readable
pool-1-thread-3                 task-87's 141  result is readable
pool-1-thread-10                task-83's 141  result is readable
pool-1-thread-5                 task-84's 141  result is readable
pool-1-thread-12                task-85's 141  result is readable
pool-1-thread-8                 task-86's 141  result is readable
pool-1-thread-2                 task-82's 141  result is readable

pool-1-thread-2 RUNNING ROOT TASK-90
pool-1-thread-8     skipping task-91 and waiting running task-90 to finish
pool-1-thread-12        skipping task-92 and waiting running task-90 to finish
pool-1-thread-5     skipping task-93 and waiting running task-90 to finish
pool-1-thread-10        skipping task-94 and waiting running task-90 to finish
pool-1-thread-3     skipping task-95 and waiting running task-90 to finish
pool-1-thread-9     skipping task-96 and waiting running task-90 to finish
pool-1-thread-6     skipping task-97 and waiting running task-90 to finish
pool-1-thread-7     skipping task-98 and waiting running task-90 to finish
pool-1-thread-4     skipping task-99 and waiting running task-90 to finish
pool-1-thread-11        skipping task-100 and waiting running task-90 to finish
pool-1-thread-2         ROOT task90's 321  result is readable
pool-1-thread-3                 task-95's 321  result is readable
pool-1-thread-7                 task-98's 321  result is readable
pool-1-thread-8                 task-91's 321  result is readable
pool-1-thread-11                task-100's 321  result is readable
pool-1-thread-4                 task-99's 321  result is readable
pool-1-thread-5                 task-93's 321  result is readable
pool-1-thread-9                 task-96's 321  result is readable
pool-1-thread-12                task-92's 321  result is readable
pool-1-thread-10                task-94's 321  result is readable
pool-1-thread-6                 task-97's 321  result is readable
,

如果您只希望一个访问线程,那么简单的同步就可以完成工作...

输出:

Start #2: Thread[ForkJoinPool.commonPool-worker-6,main]
Finish #0: Thread[ForkJoinPool.commonPool-worker-2,main]
Background task finished:task 0 finished getting data...
Start #3: Thread[ForkJoinPool.commonPool-worker-2,main]
Finish #2: Thread[ForkJoinPool.commonPool-worker-6,main]
Start #4: Thread[ForkJoinPool.commonPool-worker-6,main]
Background task finished:task 2 finished getting data...
Finish #1: Thread[ForkJoinPool.commonPool-worker-4,main]
Background task finished:task 1 finished getting data...
Start #6: Thread[ForkJoinPool.commonPool-worker-3,main]
Finish #4: Thread[ForkJoinPool.commonPool-worker-6,main]
Start #5: Thread[ForkJoinPool.commonPool-worker-6,main]
Background task finished:task 4 finished getting data...
Finish #3: Thread[ForkJoinPool.commonPool-worker-2,main]
Start #7: Thread[ForkJoinPool.commonPool-worker-2,main]
Background task finished:task 3 finished getting data...
Cancelled

代码:

package queue;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;

public class LoadInBackgroundExample {

    public static class SyncronizedBackend {
        public synchronized String getData() {

            long sleepTime = 2000;
            try {
                Thread.sleep(sleepTime);
            } catch (InterruptedException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }

            return new String("finished getting data...");
        }
    }

    /**
     * A simple background task which should perform the data loading operation. In
     * this minimal example it simply invokes Thread.sleep
     */
    public static class BackgroundTask implements Runnable {

        private int id;
        private SyncronizedBackend syncronizedBackend;
        private String result;

        public BackgroundTask(SyncronizedBackend syncronizedBackend,int id) {
            this.syncronizedBackend = syncronizedBackend;
            this.id = id;
        }

        /**
         * Sleep for a given amount of time to simulate loading.
         */
        @Override
        public void run() {

            System.out.println("Start #" + id + ": " + Thread.currentThread());

            result = this.syncronizedBackend.getData();

            System.out.println("Finish #" + id + ": " + Thread.currentThread());
        }

        public String getResult() {
            return result;
        }
    }

    /**
     * CompletableFuture which simulates loading and showing data.
     * @param syncronizedBackend 
     * 
     * @param taskId Identifier of the current task
     */
    public static void loadInBackground(SyncronizedBackend syncronizedBackend,int taskId) {

        // create the loading task
        BackgroundTask backgroundTask = new BackgroundTask(syncronizedBackend,taskId);

        // "load" the data asynchronously
        CompletableFuture<String> completableFuture = CompletableFuture.supplyAsync(new Supplier<String>() {

            @Override
            public String get() {

                CompletableFuture<Void> future = CompletableFuture.runAsync(backgroundTask);

                try {

                    future.get();

                } catch (InterruptedException | ExecutionException e) {
                    e.printStackTrace();
                }

                return "task " + backgroundTask.id + " " + backgroundTask.getResult();
            }
        });

        // display the data after they are loaded
        CompletableFuture<Void> future = completableFuture.thenAccept(x -> {

            System.out.println("Background task finished:" + x);

        });

    }

    public static void main(String[] args) {

        SyncronizedBackend syncronizedBackend = new SyncronizedBackend();

        // runnable which invokes the background loader every second
        Runnable trigger = new Runnable() {

            int taskId = 0;

            public void run() {

                loadInBackground(syncronizedBackend,taskId++);

            }
        };

        // create scheduler
        ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);
        ScheduledFuture<?> beeperHandle = scheduler.scheduleAtFixedRate(trigger,TimeUnit.SECONDS);

        try {
            beeperHandle.get();
        } catch (Throwable th) {
        }

        System.out.println("Cancelled");
        System.exit(0);
    }

}
,

我尝试了使用双线程开关的解决方案,请参阅类BackgroundTaskDualSwitch,它使用CompletableFuture模拟加载。这个想法是让第二个任务等到当前正在运行的任务完成为止,请参见BackgroundTask中的更改。这样可以确保最多一个任务线程正在运行,并且最多一个任务线程正在等待。跳过其他请求,直到正在运行的任务完成,并且变得“空闲”以处理下一个请求。

public static class BackgroundTask extends Thread {

  private int id;
  private Thread pendingTask;

  public BackgroundTask(int id) {
    this.id = id;
  }

  public BackgroundTask(int id,Thread pendingTask) {
    this(id);
    this.pendingTask = pendingTask;
  }

  /**
   * Sleep for a given amount of time to simulate loading.
   */
  @Override
  public void run() {

    try {

      if (pendingTask != null && pendingTask.isAlive()) {
        pendingTask.join();
      }

      System.out.println("Start #" + id + ": " + Thread.currentThread());
      ...
    }
  }

public static class BackgroundTaskDualSwitch {

  private static BackgroundTask task1;
  private static BackgroundTask task2;

  public static synchronized boolean runTask(int taskId) {
    if (! isBusy(task1)) {
      if (isBusy(task2)) {
        task1 = new BackgroundTask(taskId,task2);
      } else {
        task1 = new BackgroundTask(taskId);
      }
      runAsync(task1);
      return true;
    } else if (! isBusy(task2)) {
      if (isBusy(task1)) {
        task2 = new BackgroundTask(taskId,task1);
      } else {
        task2 = new BackgroundTask(taskId);
      }
      runAsync(task2);
      return true;
    } else {
      return false; // SKIPPED
    }
  }

  private static void runAsync(BackgroundTask task) {
    CompletableFuture<String> completableFuture = CompletableFuture.supplyAsync(new Supplier<String>() {

      @Override
      public String get() {

        try {
          task.start();
          task.join();
        }
        catch (InterruptedException e) {
          e.printStackTrace();
        }
        return "task " + task.id;
      }
    });

    // display the data after they are loaded
    CompletableFuture<Void> future = completableFuture.thenAccept(x -> {

      System.out.println( "Background task finished:" + x); 

    });
  }

  private static boolean isBusy(BackgroundTask task) {
    return task != null && task.isAlive();
  }
}

/**
 * Simulates loading and showing data.
 * @param taskId Identifier of the current task
 */
public static void loadInBackground(int taskId) {

  // create the loading task
  if (! BackgroundTaskDualSwitch.runTask(taskId)) {
    System.out.println( "Background task ignored:task " + taskId); // SKIPPED
  }
}
...

输出为:

Start #0: Thread[Thread-0,main]
Background task ignored:task 2
Finish #0: Thread[Thread-0,main]
Start #1: Thread[Thread-1,main]
Background task finished:task 0
Background task ignored:task 4
Finish #1: Thread[Thread-1,main]
Start #3: Thread[Thread-2,main]
Background task finished:task 1
Background task ignored:task 6
Finish #3: Thread[Thread-2,main]
Start #5: Thread[Thread-3,main]
Background task finished:task 3
Background task ignored:task 8
Finish #5: Thread[Thread-3,main]
Start #7: Thread[Thread-4,main]
Background task finished:task 5
Background task ignored:task 10
Finish #7: Thread[Thread-4,main]
Start #9: Thread[Thread-5,main]
Background task finished:task 7
Cancelled
,

第一个开始执行昂贵工作的线程将通过回调通知结果。其他尝试执行它的线程将在ExpensiveWork.notificables中注册,因此一旦完成昂贵的工作,执行该操作的线程将通知他们。

同时线程每5秒检查一次结果。

public class ExpensiveWorkTest {

    private final static int THREADS = 20;
    private final static long THREAD_TIMEOUT = 5000L;

    @Test
    public void example() throws InterruptedException {
        ExpensiveWork<String> expensiveWork = new ExpensiveWorkImpl();
        ExecutorService service = Executors.newFixedThreadPool(THREADS);
        for(int i=0; i<THREADS;i++) {
            service.execute(() ->{
                Notificable<String> notificable = new NotificableImpl();
                expensiveWork.execute(notificable);
                while(notificable.getExpensiveResult() == null) {
                    try {
                        Thread.sleep(THREAD_TIMEOUT);
                    } catch (InterruptedException e) {}
                }
                System.out.println(Thread.currentThread().getName()+" has the message: "+notificable.getExpensiveResult());
            });
        }
        service.awaitTermination(60,TimeUnit.SECONDS);
    }

    public static abstract class ExpensiveWork<T> {

        private final AtomicBoolean runnning = new AtomicBoolean(false);
        private List<Notificable<T>> notificables = Collections.synchronizedList(new ArrayList<>());

        public void execute(Notificable<T> notificable) {
            String id = Thread.currentThread().getName();
            System.out.println("Loading data for "+id);
            notificables.add(notificable);
            if(!runnning.getAndSet(true)) {
                System.out.println("Running the expensive work "+id);
                T expensiveResult = expensiveWork();
                notificables.stream().forEach(n -> n.callback(expensiveResult));
            } else {
                System.out.println(id+" will receive the response later");
            }
        }

        protected abstract T expensiveWork();

    }

    public static class ExpensiveWorkImpl extends ExpensiveWork<String>{

        @Override
        public String expensiveWork() {
            try {
                Thread.sleep(5000);
            } catch (InterruptedException e) {}
            return "<Expensive result>";
        }

    }

    public static interface Notificable<T> {
        void callback(T expensiveResult);
        T getExpensiveResult();
    }

    public static class NotificableImpl implements Notificable<String> {

        private volatile String expensiveResult;

        @Override
        public void callback(String expensiveResult) {
            this.expensiveResult = expensiveResult;
        }

        @Override
        public String getExpensiveResult() {
            return expensiveResult;
        }

    }

}

这是输出:

Loading data for pool-1-thread-6
Loading data for pool-1-thread-7
Loading data for pool-1-thread-9
pool-1-thread-9 will receive the response later
Loading data for pool-1-thread-8
pool-1-thread-8 will receive the response later
Loading data for pool-1-thread-1
Loading data for pool-1-thread-2
Loading data for pool-1-thread-12
pool-1-thread-12 will receive the response later
Loading data for pool-1-thread-3
pool-1-thread-3 will receive the response later
Loading data for pool-1-thread-4
Loading data for pool-1-thread-5
pool-1-thread-5 will receive the response later
pool-1-thread-4 will receive the response later
Loading data for pool-1-thread-14
pool-1-thread-14 will receive the response later
Loading data for pool-1-thread-13
Loading data for pool-1-thread-15
pool-1-thread-15 will receive the response later
pool-1-thread-2 will receive the response later
pool-1-thread-1 will receive the response later
Loading data for pool-1-thread-11
pool-1-thread-11 will receive the response later
Loading data for pool-1-thread-10
pool-1-thread-10 will receive the response later
pool-1-thread-7 will receive the response later
Running the expensive work pool-1-thread-6
Loading data for pool-1-thread-18
pool-1-thread-18 will receive the response later
Loading data for pool-1-thread-17
Loading data for pool-1-thread-16
pool-1-thread-16 will receive the response later
pool-1-thread-13 will receive the response later
Loading data for pool-1-thread-19
pool-1-thread-19 will receive the response later
pool-1-thread-17 will receive the response later
Loading data for pool-1-thread-20
pool-1-thread-20 will receive the response later
pool-1-thread-6 has the message: <Expensive result>
pool-1-thread-8 has the message: <Expensive result>
pool-1-thread-12 has the message: <Expensive result>
pool-1-thread-9 has the message: <Expensive result>
pool-1-thread-11 has the message: <Expensive result>
pool-1-thread-1 has the message: <Expensive result>
pool-1-thread-2 has the message: <Expensive result>
pool-1-thread-3 has the message: <Expensive result>
pool-1-thread-15 has the message: <Expensive result>
pool-1-thread-4 has the message: <Expensive result>
pool-1-thread-14 has the message: <Expensive result>
pool-1-thread-10 has the message: <Expensive result>
pool-1-thread-5 has the message: <Expensive result>
pool-1-thread-13 has the message: <Expensive result>
pool-1-thread-16 has the message: <Expensive result>
pool-1-thread-19 has the message: <Expensive result>
pool-1-thread-20 has the message: <Expensive result>
pool-1-thread-7 has the message: <Expensive result>
pool-1-thread-18 has the message: <Expensive result>
pool-1-thread-17 has the message: <Expensive result>
本文链接:https://www.f2er.com/3126284.html

大家都在问