每x分钟从骆驼队列中消费

试图实现一种方法,使我的使用者每隔30分钟左右从队列中接收消息。 就上下文而言,我的错误队列中有20条消息,直到经过x分钟为止,然后我的路由消耗了队列中的所有消息,并进入“睡眠”状态,直到又过了30分钟。

不确定执行此操作的最佳方法,我尝试过spring @ Scheduled,camel timer等,但都没有达到我的期望。我一直在尝试使它与路由策略一起使用,但是在正确的功能上没有优势。它似乎只是立即从队列中消耗掉了。

路由策略是正确的路径还是其他要使用的东西?

haoqiwei123 回答:每x分钟从骆驼队列中消费

从队列中读取的路由将始终尽可能快地读取任何消息。

您可以做的一件事是启动/停止或暂停使用消息的路由,因此请进行以下设置:

route 1: error_q_reader,which goes from('jms').
route 2: a timed route that fires every 20 mins

路线2可以使用control bus组件来启动路线。

 from('timer?20mins') // or whatever the timer syntax is...
    .to("controlbus:route?routeId=route1&action=start")

这里最棘手的部分是知道何时停止路线。您让它运行5分钟吗?消息全部消耗完后,是否要停止它?可能有一种方法可以运行另一条可以检查队列深度的路由(例如,每1分钟左右),如果它为0,然后关闭route 1,则可以使其正常工作,但是我可以保证,这样可以当您尝试处理许多异步操作时会很混乱。

您还可以尝试更奇特的操作,例如自定义QueueBrowseStrategy,当队列中没有消息时,该自定义route 1可以触发事件以关闭// Your input array $array = array( array('Tickets Open' => array('01Oct-05Oct' => 0)),array('Tickets Open' => array('06Oct-12Oct' => 0)),array('Tickets Open' => array('13Oct-19Oct' => 1)),array('Tickets Open' => array('20Oct-26Oct' => 18)),array('Tickets Open' => array('27Oct-31Oct' => 9)),); $arrColums = array_column($array,'Tickets Open'); $result = []; foreach ($arrColums as $key => $value) { $result['Tickets Open'][key($value)] = $value[key($value)]; } // result Array ( [Tickets Open] => Array ( [01Oct-05Oct] => 0 [06Oct-12Oct] => 0 [13Oct-19Oct] => 1 [20Oct-26Oct] => 18 [27Oct-31Oct] => 9 ) )

,

我构建了一个Customer Bean以排空队列并关闭,但这不是一个非常优雅的解决方案,我很想找到一个更好的解决方案。

public class TriggeredPollingConsumer {

    private ConsumerTemplate consumer;
    private Endpoint consumerEndpoint;
    private String endpointUri;
    private ProducerTemplate producer;
    private static final Logger logger = Logger.getLogger( TriggeredPollingConsumer.class );

    public TriggeredPollingConsumer() {};

    public TriggeredPollingConsumer( ConsumerTemplate consumer,String endpoint,ProducerTemplate producer ) {
        this.consumer = consumer;
        this.endpointUri = endpoint;
        this.producer = producer;
    }

    public void setConsumer( ConsumerTemplate consumer) {
        this.consumer = consumer;
    }

    public void setProducer( ProducerTemplate producer ) {
        this.producer = producer;
    }

    public void setConsumerEndpoint( Endpoint endpoint ) {
        consumerEndpoint = endpoint;
    }

    public void pollConsumer() throws Exception {
        long count = 0;
        try {
            if ( consumerEndpoint == null ) consumerEndpoint = consumer.getCamelContext().getEndpoint( endpointUri );
            logger.debug( "Consuming: " + consumerEndpoint.getEndpointUri() );
            consumer.start();
            producer.start();
            while ( true ) {
                logger.trace("Awaiting message: " + ++count );
                Exchange exchange = consumer.receive( consumerEndpoint,60000 );
                if ( exchange == null ) break;
                logger.trace("Processing message: " + count );
                producer.send( exchange );
                consumer.doneUoW( exchange );
                logger.trace("Processed message: " + count );
            }
            producer.stop();
            consumer.stop();
            logger.debug( "Consumed " + (count - 1) + " message" + ( count == 2 ? "." : "s." ) );
        } catch ( Throwable t ) {
            logger.error("Something went wrong!",t );
            throw t;
        }
    }

}

您配置Bean,然后从计时器调用Bean方法,并配置直接路由以处理队列中的条目。

from("timer:...")
  .beanRef("consumerBean","pollConsumer");
from("direct:myRoute")
  .to(...);

然后它将读取队列中的所有内容,并在一分钟内没有条目到达时立即停止。您可能想减少时间,但是我发现一秒钟意味着如果JMS有点慢,它将在耗尽队列中途超时。

我也一直在研究sjms-batch组件,以及如何将其与pollEnrich模式一起使用,但是到目前为止,我还无法使它正常工作。 / p>

,

我已经解决了这一问题,通过在MicroServices方法中将应用程序用作CronJob,并赋予它正常关闭自身的功能,我们可以设置属性camel.springboot.duration-max-idle-seconds。因此,您的JMS使用者路线保持简单。

另一种方法是声明一条路由,以控制JMS使用者路由的“生命周期”(启动,睡眠和恢复)。

我强烈建议您使用第一种方法。

,

如果您使用ActiveMQ,则可以利用其中的Scheduler feature

只需将JMS属性设置为 AMQ_SCHEDULED_DELAY,就可以延迟在代理上传递消息的时间 {毫秒}。在骆驼路线上非常容易

.setHeader("AMQ_SCHEDULED_DELAY",60000)

这并不是您要查找的内容,因为它不会每30分钟清空一次队列,而是将每条消息延迟30分钟

请注意,您必须在代理配置中启用schedulerSupport。否则,延迟属性将被忽略。

<broker brokerName="localhost" dataDirectory="${activemq.data}" schedulerSupport="true">
...
</broker>
,

你应该考虑Aggregation EIP

from(URI_WAITING_QUEUE)
    .aggregate(new GroupedExchangeAggregationStrategy())
        .constant(true)
        .completionInterval(TIMEOUT)
    .to(URI_PROCESSING_BATCH_OF_EXCEPTIONS);

这个例子描述了以下规则:所有传入的 URI_WAITING_QUEUE 对象将被分组到 List 中。 constant(true) 是一个分组条件(没有任何)。并且每 TIMEOUT 个周期(以毫秒为单位)所有分组的对象都将被传递到 URI_PROCESSING_BATCH_OF_EXCEPTIONS 队列中。

因此 URI_PROCESSING_BATCH_OF_EXCEPTIONS 队列将处理 List 个要处理的对象。您可以应用Split EIP将它们拆分并一一处理。

本文链接:https://www.f2er.com/3105970.html

大家都在问