设置
我有一个名为 Dispatcher 的Spring Boot应用程序。它在1台计算机上运行,并具有嵌入式activeMQ Broker:
@Bean
public BrokerService broker(activeMQProperties properties) throws Exception {
BrokerService broker = new BrokerService();
broker.setPersistent(false);
broker.addConnector(properties.getBrokerUrl());
return broker;
}
将任务写入JMS队列:
@Bean
public IntegrationFlow outboundFlow(activeMQConnectionFactory connectionFactory) {
return IntegrationFlows
.from(taskQueue())
.bridge(Bridges.blockingPoller(outboundTaskScheduler()))
.transform(outboundTransformer)
.handle(Jms.outboundAdapter(connectionFactory)
.extractPayload(false)
.destination(JmsQueueNames.STANDARD_TASKS))
.get();
}
@Bean
public QueueChannel standardTaskQueue() {
return MessageChannels.priority()
.comparator(TASK_PRIO_COMPARATOR)
.get();
}
// 2 more queues with different names but same config
Worker 应用程序可在10台计算机上运行,每台计算机均具有20个内核,并且其配置如下:
@Bean
public IntegrationFlow standardTaskInbound(ConnectionFactory connectionFactory) {
int maxWorkers = 20;
return IntegrationFlows
.from(Jms.channel(connectionFactory)
.sessionTransacted(true)
.concurrentConsumers(maxWorkers)
.taskExecutor(
Executors.newFixedThreadPool(maxWorkers,new CustomizableThreadFactory("standard-"))
)
.destination(JmsQueueNames.STANDARD_TASKS))
.channel(ChannelNames.TASKS_INBOUND)
.get();
}
// 2 more inbound queues with different names but same config
对第二个队列重复此操作,外加1个特殊情况。因此,共有 401个消费者。
观察
使用JConsole,我可以看到activeMQ队列中有一些任务:
[TODO插入屏幕截图]
正如预期的那样,在任何Worker机器上,都有20个使用者线程:
[TODO插入屏幕截图]
但是,即使队列中仍有消息,大多数(如果不是全部)都是 idle 。使用我们的监视工具,我发现在给定的时间常数为400的情况下,在任何给定的时间大约要处理50到400个任务。
我还观察到Spring为每个使用者创建了AbstractPollingMessageListenerContainer
,这似乎导致每个应用程序每个队列每秒打开1个JMS连接(每秒33个连接)。
调查
因此,我发现I do not receive messages in my second consumer暗示了prefetch
是罪魁祸首。这听起来很合理,所以我在每个工作人员上配置了tcp://dispatcher:61616?jms.prefetchPolicy.queuePrefetch=1
。然后,在任何时候都只处理大约25个任务,这对我来说根本没有意义。
问题
我似乎不明白发生了什么,并且由于我没有足够的时间进行调查,所以我希望有人能指出我正确的方向。哪些因素可能是原因?使用者/连接数?预取?还有吗?