如何在没有队列的情况下实现此并发结构?

在我的应用程序中,有一种情况发生,事件进入并且处理事件的线程(信令线程)必须向另一个处于闲置状态的线程(工作线程)发出信号,表明它可以运行一些代码。工作线程完成后,应等待再次发出信号。当工作线程正在工作时,事件可能会到达。在这种情况下,它应该继续运行并立即工作。工作线程执行的一项操作足以应付任何数量的传入事件,因此无需为每个事件执行一次,而只需在每个事件之后尽快进行一次。正确行为示例:

event comes in
worker thread starts work
worker thread finishes work
event comes in
worker thread starts work
event comes in
event comes in
worker thread finishes work
worker thread starts work
worker thread finishes work

4个事件,3个工作周期。不幸的是,但不可避免的要求是信令线程在处理事件时不能阻塞。目前,我已经使用BlockingQueue实现了此功能,即使内容没有意思甚至没有看,它也具有填满自身的毫无意义的副作用。我原本希望能够使用CountDownLatch或cyclicBarrier或类似工具来完成这项工作,但我一直找不到方法。这是我的实现:

import java.util.Random;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;

public class Main {

    private static final class MyBarrier {
        private BlockingQueue<Boolean> queue = new LinkedBlockingQueue<>();
        void await() throws InterruptedException {
            queue.take();
            queue.clear();
        }
        void signal() {
            queue.add(true);
        }
    }

    private static Random random = new Random(0);

    private static void sleepForMax(int maxMillis) {
        sleep(random.nextInt(maxMillis));
    }

    private static void sleep(long millis) {
        try {
            Thread.sleep(millis);
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
    }

    public static void main(String[] args) {
        MyBarrier myBarrier = new MyBarrier();
        final ExecutorService singallingThread = Executors.newSingleThreadExecutor();
        singallingThread.submit(() -> {
            while (!Thread.currentThread().isInterrupted()) {
                sleepForMax(1_000); // simulate period between events arriving
                myBarrier.signal();
                System.out.println("Signalling work to be done");
            }
            System.out.println("Thread interrupted");
        });
        final ExecutorService workingThread = Executors.newSingleThreadExecutor();
        workingThread.submit(() -> {
            while (!Thread.currentThread().isInterrupted()) {
                try {
                    System.out.println("Waiting for work");
                    myBarrier.await();
                } catch (InterruptedException e) {
                    break;
                }
                System.out.println("Doing work...");
                sleepForMax(3_000); // simulate work being done
                System.out.println("Work done");
            }
            System.out.println("Thread interrupted");
        });
        sleep(10_000);
        singallingThread.shutdownNow();
        workingThread.shutdownNow();
    }

}

有什么更好的方法?

cmswwy 回答:如何在没有队列的情况下实现此并发结构?

我正在使用java.util.concurrent.Phaser进行实验,该方法可能有效,但是我以前没有使用Phaser,所以不确定。

private static final class MyBarrier2 {
    private Phaser phaser = new Phaser(1);
    void await() throws InterruptedException {
        phaser.awaitAdvanceInterruptibly(phaser.getPhase());
    }
    void signal() {
        phaser.arrive();
    }
}
,

当我在使用Phaser的实现中运行您的代码时,更改了睡眠时间,使得每800毫秒发生一次信号发送,而处理则需要1000毫秒,我得到了此输出:

00008: Waiting for work
00808: Signalling work to be done
00808: Doing work...                      <-- worker starts working
01608: Signalling work to be done         <-- signal came,so there's more work
01808: Work done
01809: Waiting for work                   <-- waits for work...
02409: Signalling work to be done         <-- ...for 600 ms,until the next signal
02409: Doing work...

(左边的数字是从开始算起的毫秒数。此外,您可以在代码中随机延迟地重现它,但这在这里很难重现和看到。)

如果我正确理解,这是错误的。例如。想象一下如果信号停止传来会发生什么。

您的代码可能可以针对您的具体情况进行此调整:

private static final class MyBarrierWithPhaser {

    private final Phaser phaser = new Phaser(1);
    private int lastObservedPhase; // Phaser has initial phase 0

    void await() throws InterruptedException {
        // only works for 1 producer 1 worker; lastObservedPhase is kind of thread-local
        lastObservedPhase = phaser.awaitAdvanceInterruptibly(lastObservedPhase);
    }

    void signal() {
        phaser.arrive();
    }
}

这样,工作程序将记录它前进到的最后一个阶段,如果信号线程在下一个awaitAdvanceInterruptibly之前“到达”,则相位器阶段将被更新,并且当工作程序尝试使用陈旧阶段等待时,它将立即进行;如果信号线程没有在awaitAdvanceInterruptibly之前到达,则工作人员将等待直到信号线程最终到达。

使用更简单的同步原语,我可以想到如何使用synchronized-wait()-notify()机制来实现它:

private static final class MyBarrierWithSynchronized {

    private boolean hasWork = false;

    synchronized void await() throws InterruptedException {
        while (!hasWork) {
            wait();
        }
        hasWork = false;
    }

    synchronized void signal() {
        hasWork = true;
        notifyAll(); // or notify() if we are sure there is 1 signal thread and 1 worker thread
    }
}

它有几个缺点:如果线程正在等待进入await(),则不会中断this。另外,有些人不喜欢在java.util.concurrent.*上进行同步,为了简短起见,我将其保留下来。可以使用private static final class MyBarrierWithLock { private boolean hasWorkFlag = false; private final Lock lock = new ReentrantLock(); private final Condition hasWorkCond = lock.newCondition(); void await() throws InterruptedException { lock.lockInterruptibly(); try { while (!hasWorkFlag) { hasWorkCond.await(); } hasWorkFlag = false; } finally { lock.unlock(); } } void signal() { lock.lock(); try { hasWorkFlag = true; hasWorkCond.signalAll(); // or signal() if we are sure there is 1 signal thread and 1 worker thread } finally { lock.unlock(); } } } 类似物重写此代码,此实现不会同时具有以下两个缺点:

{{1}}
本文链接:https://www.f2er.com/3113099.html

大家都在问