如何使线程/作业等待直到上一个作业完成,然后在某些通知上恢复它

我有一条骆驼路线,它将对给定的ID进行更新。

对于单个请求,一切正常,例如如果我有多个具有相同ID的请求,例如10,那么我希望一次执行单个请求

或等待具有相同ID的其他请求,同时,如果我有具有其他ID的请求而无需等待12就可以处理。

为此,我在骆驼上下文“ addRequestToPool”中创建了一条附加路由,它将所有到达骆驼“ updateTicket”的请求路由到1个地方,并应从那里进行1个1个处理。

我要实现以下类似的功能

当请求到达“ synchronizeRequests”方法时,它应检查是否已经存在带有docketId的作业,是否有等待直到旧进程完成(例如10)。如果没有这样的ID(例如12),则继续处理。

一旦旧进程完成,它应该告诉其他具有相同ID的作业等待,其中1个可以从他们等待的地方继续进行工作,以便发送响应。

请求收集是通过路由“ addRequestToPool”完成的,一旦请求到来,它将为每个请求创建一个线程并调用syncize方法。在同步方法“ RequestSynchronization”中,我想对具有ID的现有Job执行检查操作(如果存在),如果没有返回响应,则等待上一个通知,然后继续进行处理。

一旦使用路由“ removeRequestFromPool”结束了作业,则将删除该作业的当前ID,并应通知该组中的新作业(相同ID)继续。

我想知道如何实现上述方案或任何其他合适的方法?

下面是供参考的示例代码

骆驼路线:

from("direct:updateTicket")
    .to("direct:addRequestToPool")
    .to("direct:getDetailForTicket")
    .to("direct:updateDetailForTroubleTicket")
    .to("direct:getDetailForTicket")
    .to("direct:removeRequestFromPool")
    .to("direct:endRoute");

API调用:

public class TicketRequestManagement {

    public SimpleactionResponse addRequestToPool(int docketNo) throws InterruptedException {

        SimpleactionResponse response = new SimpleactionResponse();

        RequestSynchronization requestSynchronization = new RequestSynchronization(docketNo);
        Thread thread = new Thread(requestSynchronization);
        thread.start();
        thread.join();
        return response;
    }
}

同步方法:

public class RequestSynchronization implements Runnable {

    private static Logger logger = Logger.getLogger(RequestSynchronization.class);

    private int docketNo;

    public RequestSynchronization(int docketNo) {
        super();
        this.docketNo = docketNo;
    }

    @Override
    public void run() {
        synchronizeRequests(docketNo);
    }

    public synchronized static void synchronizeRequests(int docketNo) {
        try {
            Thread.sleep(2000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

}

我正在考虑将docketId添加到静态数据结构(集合)中,并在过程完成后检查是否存在ID等待(我应如何执行此等待?),从集合中删除ID并通知等待过程从那里恢复(如何执行此通知相同的ID?)。

让我知道是否需要澄清。

PS:我不是要睡觉,这只是为了进行同步测试。

谢谢。

ymsabc 回答:如何使线程/作业等待直到上一个作业完成,然后在某些通知上恢复它

我的建议:异步。例如,使用JMS(不是直接使用Camels excellent JMS support,而是使用JMS)分离请求和处理请求。这样,您可以简单地通过使用方计数来控制整个线程。

您的情况应该是

  • addRequestToPool将请求放入JMS队列
  • JMS使用者使用队列中的请求并进行处理
  • 使用 message groups来同步具有相同ID的请求
  • 使用用户数量来确保并行处理其他ID并进行扩展(您编写了0行代码用于线程操作)
  • 即使处理组件停机进行维护,客户端仍然可以发送更新请求

消息组是JMS代理功能(至少是ActiveMQ)。您将ID设置为JMSXGroupID,并且代理确保具有相同ID的所有邮件均由同一使用者处理。如果您有5个使用者,并且您收到10条具有相同ID的消息,则1个使用者完成所有工作。具有另一个ID的消息到达时,另一个使用者将并行处理该消息。

由您决定要构建一个大型的JMS组件来完成所有处理,还是要构建多个组件来在处理工作流程中通过多个队列传递消息。

,

首先,不要手动启动Thread,这就是ExecutorService的作用。

第二,我建议您在Map<Integer,Future<>>中维护启动的同步,以便您可以检查该ID是否已经在运行。

草稿是

class TicketRequestService {
    private Map<Integer,Future<?>> runningRequests = new ConcurrentHashMap<>();
    ExecutorService executor = Executors.newCachedThreadPool();

    public void synchronizeRequest(int docketId) {
        // get the running Future if present,otherwise...
        Future<?> future = runningRequests.computeIfAbsent(docketId,// ...create a new one
            id-> executor.submit(() -> {
                RequestSynchronization.synchronizeRequests(id));
                // remove future from map after sync is finished
                runningRequests.remove(docketId);
            });
        future.get(); // wait until finished; this throws Exceptions,you need to handle them
        return new SimpleActionResponse();
   }
}
本文链接:https://www.f2er.com/3146064.html

大家都在问