Spring Listener容器-事务管理器-Spring重试-多个事务之间

我通过RabbitMQ拥有Spring引导服务。

我正在开发测试,以确保流程在多个事务之间保持事务状态。

示例:

@Transactional
@RabbitListener
Queue Listener 1
1- receive message
2- call to Class 1 Method 1

@Transactional
Class 1 Method 1
1- Persist some data in data base
2- Publish a Message in a 2nd Queue

@Transactional
@RabbitListener
Queue Listener 2
- read message
- doStuff()

我希望我的服务具有以下行为。

如果在Queue Listener 2中引发了异常(例如在doStuff()中),我也想回退Class 1 Method 1事务。 我也想重试功能。

使用Propagation.SUPPORTSPropagation.REQUIRED配置事务似乎也不起作用。

兔子侦听器标注为@Transactional。

为此,我将SimpleRabbitListenerContainerFactory咨询链配置为StatefulRetryOperationsInterceptor

我配置了事务管理器,该事务管理器在我的应用程序中为JpaTransactionmanager

兔子模板通道的“交易次数”设置为true,

还带有jackson2JsonmessageConverter.setCreateMessageIds(true);的Message Converter,用于保留消息ID。

我在这里附加我的SimpleRabbitListenerContainerFactory配置:

    @Bean
    public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(final SimpleRabbitListenerContainerFactoryConfigurer configurer,final PlatformTransactionmanager platformTransactionmanager) throws IOException {

        final SimpleRabbitListenerContainerFactory container = new SimpleRabbitListenerContainerFactory();
        final CachingConnectionFactory cachingConnectionFactory = cachingConnectionFactory();

        container.setConnectionFactory(cachingConnectionFactory);
        container.setMessageConverter(jackson2JsonmessageConverter());
        container.setacknowledgeMode(AcknowledgeMode.AUTO);
        container.setDefaultRequeueRejected(false);
        container.setConsecutiveIdleTrigger(1);
        container.setConcurrentConsumers(1);
        container.setMaxConcurrentConsumers(1);
        container.setChannelTransacted(true);

        container.setTransactionmanager(platformTransactionmanager);

        container.setConcurrentConsumers(listenerConcurrency);
        container.setMaxConcurrentConsumers(maxListenerConcurrency);

        final StatefulRetryOperationsInterceptor statefulRetryOperationsInterceptor = RetryInterceptorBuilder.stateful()
                .retryPolicy(new SimpleRetryPolicy(maxAttempts,exceptionsTriggeringRetry()))
                .backOffPolicy(backOffPolicy())
                .build();

        container.setadviceChain(statefulRetryOperationsInterceptor);
        configurer.configure(container,cachingConnectionFactory);

        return container;
    }

    @Bean
    public Jackson2JsonmessageConverter jackson2JsonmessageConverter() {

        final ObjectMapper objectMapper = new ObjectMapper();
        objectMapper.enable(MapperFeature.accEPT_CASE_INSENSITIVE_ENUMS);
        objectMapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES,false);

        final Jackson2JsonmessageConverter jackson2JsonmessageConverter = new Jackson2JsonmessageConverter(objectMapper);
        jackson2JsonmessageConverter.setCreateMessageIds(true);

        return jackson2JsonmessageConverter;
    }

    @Bean
    public RabbitTemplate transactedRabbitTemplate(@Named("cachingConnectionFactory") final ConnectionFactory connectionFactory,@Named("jackson2JsonmessageConverter") final Jackson2JsonmessageConverter jackson2MessageConverter){
        RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
        rabbitTemplate.setMessageConverter(jackson2MessageConverter);
        rabbitTemplate.setChannelTransacted(true);
        return rabbitTemplate;
    }

重试政策及其重试次数似乎受到尊重,

但是事务完整性(带有回滚)似乎没有被执行(因此以前的事务中没有回滚,即:第三次事务失败不会触发第二次事务回滚),但是当然,同一事务单元内的错误可以正常工作。

编辑

也尝试过

container.setadviceChain(new TransactionInterceptor(platformTransactionmanager,new Properties()));

但似乎仍然无法正常工作。

daiandy001 回答:Spring Listener容器-事务管理器-Spring重试-多个事务之间

交易不能跨越这样的队列;发布到队列是第一笔交易之路的终点。

生产者和消费者彼此隔离。

由于您还使用Rabbit事务,因此在第一个事务提交之前,侦听器2甚至都不会看到该消息。

本文链接:https://www.f2er.com/3158579.html

大家都在问