编辑
只需了解如何在一项服务中运行多个使用者:
@Bean
SimpleMessageListenerContainer container(ConnectionFactory connectionFactory,MessageListenerAdapter listenerAdapter) {
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
container.setConnectionFactory(connectionFactory);
container.setQueueNames(RENDER_QUEUE);
container.setConcurrentConsumers(concurrentConsumers); // setting this in env
container.setMessageListener(listenerAdapter);
return container;
}
@Bean
MessageListenerAdapter listenerAdapter(RenderMessageConsumer receiver) {
return new MessageListenerAdapter(receiver,"reciveMessageFromRenderQueue");
}
现在剩下的唯一问题是:我怎么有全局限制?那么,AMQP接收器的多个实例如何共享使用方总数?因此,我想将并发消费者的全局数量设置为10,运行2个consumerSerivce实例,并让每个实例围绕5个消费者运行。可以由rabbitMq管理吗?
我有一个Spring服务,该服务使用AMQP消息并为每个消息调用一个http资源。 http调用完成后,将调用另一个队列以报告错误或完成。只有这样,消息处理才能完成,并且下一条消息将从队列中取出。
// simplified
@RabbitListener(queues = RENDER_QUEUE)
public void reciveMessageFromRenderQueue(String message) {
try {
RenderMessage renderMessage = JsonUtils.stringToObject(message,RenderMessage.class);
String result = renderService.httpCallRenderer(renderMessage);
messageProducer.sendDoneMessage(result);
} catch (Exception e) {
logError(type,e);
messageProducer.sendErrorMessage(e.getMessage());
}
}
有时,队列中有成百上千的渲染消息,但是http调用运行时间很长,没有做很多事情。这很明显,因为我可以通过运行服务的多个实例来提高消息处理率,从而增加更多的使用者并多次调用http端点。一个实例恰好有一个消费者使用通道,因此实例的数量等于消费者的数量。但是,由于转发消息和处理结果,这大大增加了内存使用量(由于该服务使用spring)。
所以我想,我将异步执行http调用,并在接受消息后立即返回:
.httpCallRendererAsync(renderMessage)
.subscribeon(Schedulers.newThread())
.subscribe(new Observer<String >() {
public void onNext(String result) {
messageProducer.sendDoneMessage(result);
}
public void onError(Throwable throwable) {
messageProducer.sendErrorMessage(throwable.getMessage());
}
});
但是,这会使无法处理1000个或更多同时请求的http端点过载。
我需要的是我的amqp服务从队列中获取一定数量的消息,在单独的线程中处理它们,在每个线程中进行http调用,然后返回“已处理消息”。但是,从队列中获取的消息量需要在该服务的多个实例之间共享,因此,如果最大值为10,消息消耗为循环方式,则前5个奇数消息应由实例1处理,前5个偶数消息应由实例1处理。实例2,并且一旦一个实例处理完消息,就应该从队列中取出另一个实例。
我发现了一些类似的事情,例如消费者和渠道as described by rabbitmq带有草的预取。还有使用prefetchCount和transactionSize described here的spring-rabbit实现。但是,对于单个正在运行的实例,这似乎没有任何作用。它不会产生其他线程来同时处理更多消息。当然,这不会减少在我的异步情况下处理的消息的数量,因为这些消息被立即视为“已处理”。
@Bean
public RabbitListenerContainerFactory<SimpleMessageListenerContainer> prefetchContainerFactory(ConnectionFactory rabbitConnectionFactory) {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
factory.setConnectionFactory(rabbitConnectionFactory);
factory.setPrefetchCount(5);
factory.setTxSize(5);
return factory;
}
// and then using
@RabbitListener(queues = RENDER_QUEUE,containerFactory = "prefetchContainerFactory")
对我来说,最重要的要求似乎是在一个实例中应处理多条消息,而实例之间应共享并发处理的最大消息数。 可以使用rabbitMq和spring吗?还是我必须在两者之间实现一些东西。
在早期阶段,仅在一个实例中进行并发消息处理而不共享该限制可能是可以接受的。然后,我必须在缩放实例数的同时使用环境变量手动配置限制。