Spring Integration JMS使用者不使用所有消息

设置

我有一个名为 Dispatcher 的Spring Boot应用程序。它在1台计算机上运行,​​并具有嵌入式activeMQ Broker:

  @Bean
  public BrokerService broker(activeMQProperties properties) throws Exception {
    BrokerService broker = new BrokerService();
    broker.setPersistent(false);
    broker.addConnector(properties.getBrokerUrl());
    return broker;
  }

将任务写入JMS队列:

  @Bean
  public IntegrationFlow outboundFlow(activeMQConnectionFactory connectionFactory) {
    return IntegrationFlows
      .from(taskQueue())
      .bridge(Bridges.blockingPoller(outboundTaskScheduler()))
      .transform(outboundTransformer)
      .handle(Jms.outboundAdapter(connectionFactory)
        .extractPayload(false)
        .destination(JmsQueueNames.STANDARD_TASKS))
      .get();
  }

  @Bean
  public QueueChannel standardTaskQueue() {
    return MessageChannels.priority()
      .comparator(TASK_PRIO_COMPARATOR)
      .get();
  }

  // 2 more queues with different names but same config

Worker 应用程序可在10台计算机上运行,​​每台计算机均具有20个内核,并且其配置如下:

  @Bean
  public IntegrationFlow standardTaskInbound(ConnectionFactory connectionFactory) {
    int maxWorkers = 20;
    return IntegrationFlows
      .from(Jms.channel(connectionFactory)
        .sessionTransacted(true)
        .concurrentConsumers(maxWorkers)
        .taskExecutor(
          Executors.newFixedThreadPool(maxWorkers,new CustomizableThreadFactory("standard-"))
        )
        .destination(JmsQueueNames.STANDARD_TASKS))
      .channel(ChannelNames.TASKS_INBOUND)
      .get();
  }

  // 2 more inbound queues with different names but same config

对第二个队列重复此操作,外加1个特殊情况。因此,共有 401个消费者

观察

使用JConsole,我可以看到activeMQ队列中有一些任务:

[TODO插入屏幕截图]

正如预期的那样,在任何Worker机器上,都有20个使用者线程:

[TODO插入屏幕截图]

但是,即使队列中仍有消息,大多数(如果不是全部)都是 idle 。使用我们的监视工具,我发现在给定的时间常数为400的情况下,在任何给定的时间大约要处理50到400个任务。

我还观察到Spring为每个使用者创建了AbstractPollingMessageListenerContainer,这似乎导致每个应用程序每个队列每秒打开1个JMS连接(每秒33个连接)。

调查

因此,我发现I do not receive messages in my second consumer暗示了prefetch是罪魁祸首。这听起来很合理,所以我在每个工作人员上配置了tcp://dispatcher:61616?jms.prefetchPolicy.queuePrefetch=1。然后,在任何时候都只处理大约25个任务,这对我来说根本没有意义。

问题

我似乎不明白发生了什么,并且由于我没有足够的时间进行调查,所以我希望有人能指出我正确的方向。哪些因素可能是原因?使用者/连接数?预取?还有吗?

nihaolizhiguo 回答:Spring Integration JMS使用者不使用所有消息

原来是由预取策略引起的。在我的情况下,正确的配置是使用i

在较早的(失败的)测试中,我使用了tcp://dispatcher:61616?jms.prefetchPolicy.all=0,但事后看来,我不确定是否在正确的位置进行了配置。

本文链接:https://www.f2er.com/2773010.html

大家都在问