我通过RabbitMQ拥有Spring引导服务。
我正在开发测试,以确保流程在多个事务之间保持事务状态。
示例:
@Transactional
@RabbitListener
Queue Listener 1
1- receive message
2- call to Class 1 Method 1
@Transactional
Class 1 Method 1
1- Persist some data in data base
2- Publish a Message in a 2nd Queue
@Transactional
@RabbitListener
Queue Listener 2
- read message
- doStuff()
我希望我的服务具有以下行为。
如果在Queue Listener 2
中引发了异常(例如在doStuff()
中),我也想回退Class 1 Method 1
事务。
我也想重试功能。
使用Propagation.SUPPORTS
或Propagation.REQUIRED
配置事务似乎也不起作用。
兔子侦听器标注为@Transactional。
为此,我将SimpleRabbitListenerContainerFactory
咨询链配置为StatefulRetryOperationsInterceptor
。
我配置了事务管理器,该事务管理器在我的应用程序中为JpaTransactionmanager
。
兔子模板通道的“交易次数”设置为true,
还带有jackson2JsonmessageConverter.setCreateMessageIds(true);
的Message Converter,用于保留消息ID。
我在这里附加我的SimpleRabbitListenerContainerFactory
配置:
@Bean
public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(final SimpleRabbitListenerContainerFactoryConfigurer configurer,final PlatformTransactionmanager platformTransactionmanager) throws IOException {
final SimpleRabbitListenerContainerFactory container = new SimpleRabbitListenerContainerFactory();
final CachingConnectionFactory cachingConnectionFactory = cachingConnectionFactory();
container.setConnectionFactory(cachingConnectionFactory);
container.setMessageConverter(jackson2JsonmessageConverter());
container.setacknowledgeMode(AcknowledgeMode.AUTO);
container.setDefaultRequeueRejected(false);
container.setConsecutiveIdleTrigger(1);
container.setConcurrentConsumers(1);
container.setMaxConcurrentConsumers(1);
container.setChannelTransacted(true);
container.setTransactionmanager(platformTransactionmanager);
container.setConcurrentConsumers(listenerConcurrency);
container.setMaxConcurrentConsumers(maxListenerConcurrency);
final StatefulRetryOperationsInterceptor statefulRetryOperationsInterceptor = RetryInterceptorBuilder.stateful()
.retryPolicy(new SimpleRetryPolicy(maxAttempts,exceptionsTriggeringRetry()))
.backOffPolicy(backOffPolicy())
.build();
container.setadviceChain(statefulRetryOperationsInterceptor);
configurer.configure(container,cachingConnectionFactory);
return container;
}
@Bean
public Jackson2JsonmessageConverter jackson2JsonmessageConverter() {
final ObjectMapper objectMapper = new ObjectMapper();
objectMapper.enable(MapperFeature.accEPT_CASE_INSENSITIVE_ENUMS);
objectMapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES,false);
final Jackson2JsonmessageConverter jackson2JsonmessageConverter = new Jackson2JsonmessageConverter(objectMapper);
jackson2JsonmessageConverter.setCreateMessageIds(true);
return jackson2JsonmessageConverter;
}
@Bean
public RabbitTemplate transactedRabbitTemplate(@Named("cachingConnectionFactory") final ConnectionFactory connectionFactory,@Named("jackson2JsonmessageConverter") final Jackson2JsonmessageConverter jackson2MessageConverter){
RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
rabbitTemplate.setMessageConverter(jackson2MessageConverter);
rabbitTemplate.setChannelTransacted(true);
return rabbitTemplate;
}
重试政策及其重试次数似乎受到尊重,
但是事务完整性(带有回滚)似乎没有被执行(因此以前的事务中没有回滚,即:第三次事务失败不会触发第二次事务回滚),但是当然,同一事务单元内的错误可以正常工作。
编辑
也尝试过
container.setadviceChain(new TransactionInterceptor(platformTransactionmanager,new Properties()));
但似乎仍然无法正常工作。