KafkaProducer和KafkaTransactionManager在异常上不回滚

客观-

从(源)MQ队列消费消息并发布到

a)另一个(目标)MQ队列和

b)事务中的一个Kafka主题,从而避免在MQ或Kafka发布失败的情况下从源MQ删除消息。

使用的框架

春季启动版本-2.1.5

Spring JMS -5.1.7

Spring Kafka- 2.2.6

融合Kafka- 5.3

MQ -9

Kafka配置

    @Configuration
@ConfigurationProperties(prefix = "spring.kafka")
@Slf4j
@Getter
@Setter
@ToString
@EnableTransactionmanagement
public class KafkaConfig {
    /** injected local properties */
    public Map<String,Object> producerConfigs() throws IOException{
        Map<String,Object> props = new HashMap<>();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,bootstrapServers);
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,keySerializer);
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,valueSerializer);
        props.put(ProducerConfig.ACKS_CONFIG,acks);
        props.put(ProducerConfig.RETRIES_CONFIG,retries);
        log.info("Value of transaction id 0 {}",transactionIdPrefix);
        props.put(ProducerConfig.TRANSactIONAL_ID_CONFIG,transactionIdPrefix);
        sslCommonconfigs(props);
        return props;
    }

    public Map<String,Object> consumerConfigs() throws IOException{
        Map<String,Object> props = new HashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,bootstrapServers);
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,keyDeserializer);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,valueDeserializer);
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,autoOffsetReset);
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,enableAutocommit);
        sslCommonconfigs(props);
        return props;
    }

    public Map<String,Object> sslCommonconfigs(Map<String,Object> props) throws IOException {
        log.info("kafka config {}",this);
        props.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG,schemaRegistryUrl);
        props.put(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG,specificAvroReader);
        props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG,"SSL");
        props.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG,FileUtil.decodeCertFile(trustStoreValue,"kafka_truststore.jks"));
        props.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG,trustStorePw);
        props.put(SslConfigs.SSL_TRUSTSTORE_TYPE_CONFIG,SslConfigs.DEFAULT_SSL_TRUSTSTORE_TYPE); //"JKS"
        props.put(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG,FileUtil.decodeCertFile(keyStoreValue,"kafka_keystore.jks"));
        props.put(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG,keyStorePw);
        props.put(SslConfigs.SSL_KEY_PASSWORD_CONFIG,keyStorePw);
        return props;
    }

    @Bean
    public ProducerFactory producerFactory() throws IOException {
        DefaultKafkaProducerFactory producerFactory = new DefaultKafkaProducerFactory<>(producerConfigs());
        producerFactory.setTransactionIdPrefix(this.transactionIdPrefix);
        return producerFactory;
    }

    @Bean
    public KafkaTemplate<String,RawPage> ddaKafkaTemplate() throws IOException {
        return new KafkaTemplate<String,RawPage>(producerFactory());
    }
    @Bean
    public KafkaTransactionmanager<String,RawPage> kafkaTransactionmanager(ProducerFactory<String,RawPage> producerFactory) {
        log.info("producerFactory.transactionCapable() {}",producerFactory.transactionCapable());
        KafkaTransactionmanager transactionmanager = new KafkaTransactionmanager(producerFactory);
        transactionmanager.setnestedTransactionAllowed(true);
        transactionmanager.setTransactionSynchronization(AbstractPlatformTransactionmanager.SYNCHRONIZATION_ALWAYS);
        return transactionmanager;
    }

** Applicationconfig类*

@Slf4j
@Configuration
public class Applicationconfig {
    @Bean
    public JmsListenerContainerFactory<?> myMessageFactory(ConnectionFactory connectionFactory,DefaultJmsListenerContainerFactoryConfigurer configurer,ChainedTransactionmanager chainedTransactionmanager) {
        log.debug("Connection factory instance as received {} {}",connectionFactory,configurer);
        DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
        factory.setSessionAcknowledgeMode(Session.CLIENT_ACKNOWLEDGE);
        factory.setTransactionmanager(chainedTransactionmanager);
        factory.setSessionTransacted(true);
        configurer.configure(factory,connectionFactory);
        log.debug("Returning the myMessageFactory factory instance as {}",factory);
        return factory;
    }
    @Bean
    public JmsTemplate jmsTemplate(ConnectionFactory connectionFactory){
        JmsTemplate jmsTemplate = new JmsTemplate(connectionFactory);
        jmsTemplate.setSessionTransacted(true);
        return jmsTemplate;
    }
    @Bean
    public JmsTransactionmanager jmsTransactionmanager(ConnectionFactory connectionFactory) {
        return new JmsTransactionmanager(connectionFactory);
    }

    @Bean
    public ChainedKafkaTransactionmanager chainedTransactionmanager(KafkaTransactionmanager kafkaTransactionmanager,JmsTransactionmanager jmsTransactionmanager){
        return new ChainedKafkaTransactionmanager(jmsTransactionmanager,kafkaTransactionmanager );
    }
}

实际消费者和发布代码

    @Service
@Slf4j
@Setter
@Getter
public class MyMessageProcessor {
    @Autowired
    private KafkaTemplate<String,Event> kafkaTemplate;
    @Autowired
    private JmsTemplate jmsTemplate;

    @JmsListener(destination = "desintationQueue",containerFactory = "myMessageFactory")
    public void receiveMessage(TextMessage message){
        try {
            log.info("Received message {}",message.getText());
            send(destinationQueueName,message.getText());
            // build avro event
            publish(evnet);
            // only acknowledge if the message is successfully processed till kafka publication
            message.acknowledge();
        }catch (JMSException|CustomKafkaPublicationException e){
            log.error("Error in consuming the message from sourceSystem {}",ExceptionUtils.getStackTrace(e));            
        }
    }
    public void send(String queueName,final String msg) throws RawPagePublicationException{
        if(StringUtils.isEmpty(msg)|| StringUtils.isEmpty(queueName)){
            String errorMessage = String.format("Incorrect message and queue details msg %s queueName %s.",msg,queueName);
            log.error(errorMessage);
            throw new CustomKafkaPublicationException(errorMessage);
        }
        log.info("Publishing the message to destination queue {} at time  in millis {}",queueName,System.currentTimeMillis());
        jmsTemplate.convertAndSend(queueName,msg);
        log.info("Published the message to queue {} at time in millis {}",System.currentTimeMillis());
    }

Spring主启动类

@SpringBootApplication(exclude = {KafkaAutoConfiguration.class})
@EnableJms
@Slf4j
@EnableTransactionmanagement
@EnableRetry
@EnableAutoConfiguration(exclude = 
{JmsHealthIndicatorAutoConfiguration.class,KafkaAutoConfiguration.class})
public class MyConsumerApplication {

错误日志

    - [enerContainer-1] .s.t.s.TransactionSynchronizationmanager : Clearing transaction synchronization
2019-12-08 19:39:49.270 DEBUG 37524 --- [enerContainer-1] o.s.j.connection.JmsTransactionmanager   : Creating new transaction with name [null]: PROPAGATION_REQUIRED,ISOLATION_DEFAULT
2019-12-08 19:39:49.332 DEBUG 37524 --- [enerContainer-1] o.s.j.connection.JmsTransactionmanager   : Created JMS transaction on Session [com.ibm.mq.jms.MQSession@1a7d298f] from Connection [com.ibm.mq.jms.MQConnection@5d644e4a]
2019-12-08 19:39:49.333 TRACE 37524 --- [enerContainer-1] .s.t.s.TransactionSynchronizationmanager : Bound value [org.springframework.jms.connection.JmsResourceHolder@34a4adbe] for key [JMS_RESOURCE_KEY] to thread [DefaultMessageListenerContainer-1]
2019-12-08 19:39:49.333 DEBUG 37524 --- [enerContainer-1] o.a.k.c.p.internals.Transactionmanager   : [Producer clientId=producer-1,transactionalId=SAMP-CON-0] Transition from state READY to IN_TRANSactION
2019-12-08 19:39:49.333 TRACE 37524 --- [enerContainer-1] .s.t.s.TransactionSynchronizationmanager : Bound value [org.springframework.kafka.core.KafkaResourceHolder@48a5eda2] for key [org.springframework.kafka.core.DefaultKafkaProducerFactory@64aad809] to thread [DefaultMessageListenerContainer-1]
2019-12-08 19:39:49.333 TRACE 37524 --- [enerContainer-1] .s.t.s.TransactionSynchronizationmanager : Initializing transaction synchronization
2019-12-08 19:39:49.333 TRACE 37524 --- [enerContainer-1] .s.t.s.TransactionSynchronizationmanager : Retrieved value [org.springframework.jms.connection.JmsResourceHolder@34a4adbe] for key [JMS_RESOURCE_KEY] bound to thread [DefaultMessageListenerContainer-1]
2019-12-08 19:39:49.848 DEBUG 37524 --- [enerContainer-1] o.s.j.l.DefaultMessageListenerContainer  : Received message of type [class com.ibm.jms.JMSTextMessage] from consumer [com.ibm.mq.jms.MqqueueReceiver@2826f379] of transactional session [com.ibm.mq.jms.MQSession@1a7d298f]
2019-12-08 19:39:49.849 DEBUG 37524 --- [enerContainer-1] .s.j.l.a.MessagingMessageListenerAdapter : Processing [org.springframework.jms.listener.adapter.AbstractAdaptableMessageListener$MessagingMessageConverterAdapter$LazyResolutionmessage@53b59c04]
2019-12-08 19:39:49.849  INFO 37524 --- [enerContainer-1] c.d.g.d.c.a.d.c.DDAMessageProcessor      : Received message 1221222112#
2019-12-08 19:39:49.859  INFO 37524 --- [enerContainer-1] c.d.g.d.c.a.d.c.DDAMessageProcessor      : Publishing the message to destination queue DEST_QUEUE at time  in millis 1575833989859
2019-12-08 19:39:49.861 TRACE 37524 --- [enerContainer-1] .s.t.s.TransactionSynchronizationmanager : Retrieved value [org.springframework.jms.connection.JmsResourceHolder@34a4adbe] for key [JMS_RESOURCE_KEY] bound to thread [DefaultMessageListenerContainer-1]
2019-12-08 19:39:49.861 DEBUG 37524 --- [enerContainer-1] o.springframework.jms.core.JmsTemplate   : Executing callback on JMS Session: com.ibm.mq.jms.MQSession@1a7d298f
2019-12-08 19:39:49.875 DEBUG 37524 --- [enerContainer-1] o.springframework.jms.core.JmsTemplate   : Sending created message: 
  JMSMessage class: jms_text
  JMSType:          null
  JMSDeliveryMode:  2
  JMSDeliveryDelay: 0
  JMSDeliveryTime:  0
  JMSExpiration:    0
  JMSPriority:      4
  JMSMessageID:     null
  JMSTimestamp:     0
  JMSCorrelationID: null
  JMSDestination:   null
  JMSReplyTo:       null
  JMSRedelivered:   false
1221222112#
2019-12-08 19:39:49.902 TRACE 37524 --- [enerContainer-1] .s.t.s.TransactionSynchronizationmanager : Retrieved value [org.springframework.jms.connection.JmsResourceHolder@34a4adbe] for key [JMS_RESOURCE_KEY] bound to thread [DefaultMessageListenerContainer-1]
2019-12-08 19:39:49.905  INFO 37524 --- [enerContainer-1] c.d.g.d.c.a.d.c.DDAMessageProcessor      : Published the message to queue DEST_QUEUE at time in millis 1575833989905
2019-12-08 19:39:49.997 DEBUG 37524 --- [enerContainer-1] c.d.g.d.c.a.d.c.DDAMessageProcessor      : Publishing message with key 106e096a-4633-49c8-abaa-a8d0bade84d2: value {"content": "1221222112#","sourceType": "MQ","sourceLocation": "MINT","msgType": null,"correlationId": "106e096a-4633-49c8-abaa-a8d0bade84d2","receivedTs": 1575833989997}
2019-12-08 19:39:49.999 TRACE 37524 --- [enerContainer-1] .s.t.s.TransactionSynchronizationmanager : Retrieved value [org.springframework.kafka.core.KafkaResourceHolder@48a5eda2] for key [org.springframework.kafka.core.DefaultKafkaProducerFactory@64aad809] bound to thread [DefaultMessageListenerContainer-1]
2019-12-08 19:39:49.999 TRACE 37524 --- [enerContainer-1] .s.t.s.TransactionSynchronizationmanager : Retrieved value [org.springframework.kafka.core.KafkaResourceHolder@48a5eda2] for key [org.springframework.kafka.core.DefaultKafkaProducerFactory@64aad809] bound to thread [DefaultMessageListenerContainer-1]
2019-12-08 19:39:50.007 TRACE 37524 --- [enerContainer-1] o.a.k.clients.producer.KafkaProducer     : [Producer clientId=producer-1,transactionalId=SAMP-CON-0] Requesting metadata update for topic TOPIC-SAMP-DATA.
2019-12-08 19:39:50.170  INFO 37524 --- [ad | producer-1] org.apache.kafka.clients.Metadata        : Cluster ID: O5fhv74bT9KIkV17ia8snQ
2019-12-08 19:39:50.309 ERROR 37524 --- [enerContainer-1] c.d.g.d.c.a.d.c.DDAMessageProcessor      : Error publishing raw page with exception org.apache.kafka.common.errors.SerializationException: Error serializing Avro message
Caused by: org.apache.avro.AvroTypeException: Not an enum: null for schema: {"type":"enum","name":"MsgType","namespace":"com.mysample.avro","symbols":["TYPE"]}
    at org.apache.avro.generic.GenericDatumWriter.writeEnum(GenericDatumWriter.java:218)
    at org.apache.avro.specific.SpecificDatumWriter.writeEnum(SpecificDatumWriter.java:61)
    at org.apache.avro.generic.GenericDatumWriter.writeWithoutConversion(GenericDatumWriter.java:133)
    at org.apache.avro.specific.SpecificDatumWriter.writeField(SpecificDatumWriter.java:98)
    at org.apache.avro.generic.GenericDatumWriter.writeRecord(GenericDatumWriter.java:195)
    at org.apache.avro.specific.SpecificDatumWriter.writeRecord(SpecificDatumWriter.java:83)
    at org.apache.avro.generic.GenericDatumWriter.writeWithoutConversion(GenericDatumWriter.java:130)
    at org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:82)
    at org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:72)
    at io.confluent.kafka.serializers.AbstractKafkaAvroSerializer.serializeImpl(AbstractKafkaAvroSerializer.java:101)
    at io.confluent.kafka.serializers.KafkaAvroSerializer.serialize(KafkaAvroSerializer.java:53)
    at org.apache.kafka.common.serialization.ExtendedSerializer$Wrapper.serialize(ExtendedSerializer.java:65)
    at org.apache.kafka.common.serialization.ExtendedSerializer$Wrapper.serialize(ExtendedSerializer.java:55)
    at org.apache.kafka.clients.producer.KafkaProducer.doSend(KafkaProducer.java:841)
    at org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:803)
    at org.springframework.kafka.core.DefaultKafkaProducerFactory$CloseSafeProducer.send(DefaultKafkaProducerFactory.java:444)
    at org.springframework.kafka.core.KafkaTemplate.doSend(KafkaTemplate.java:381)
    at org.springframework.kafka.core.KafkaTemplate.send(KafkaTemplate.java:199)
    at com.mysample.consumer.DDAMessageProcessor.publish(DDAMessageProcessor.java:106)
    at com.mysample.consumer.DDAMessageProcessor.receiveMessage(DDAMessageProcessor.java:64)
    at sun.reflect.NativeMethodaccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodaccessorImpl.invoke(NativeMethodaccessorImpl.java:62)
    at sun.reflect.DelegatingMethodaccessorImpl.invoke(DelegatingMethodaccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.doInvoke(InvocableHandlerMethod.java:171)
    at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.invoke(InvocableHandlerMethod.java:120)
    at org.springframework.jms.listener.adapter.MessagingMessageListenerAdapter.invokeHandler(MessagingMessageListenerAdapter.java:114)
    at org.springframework.jms.listener.adapter.MessagingMessageListenerAdapter.onmessage(MessagingMessageListenerAdapter.java:77)
    at org.springframework.jms.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:736)
    at org.springframework.jms.listener.AbstractMessageListenerContainer.invokeListener(AbstractMessageListenerContainer.java:696)
    at org.springframework.jms.listener.AbstractMessageListenerContainer.doExecuteListener(AbstractMessageListenerContainer.java:674)
    at org.springframework.jms.listener.AbstractPollingMessageListenerContainer.doReceiveAndExecute(AbstractPollingMessageListenerContainer.java:318)
    at org.springframework.jms.listener.AbstractPollingMessageListenerContainer.receiveAndExecute(AbstractPollingMessageListenerContainer.java:245)
    at org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.invokeListener(DefaultMessageListenerContainer.java:1189)
    at org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.executeongoingLoop(DefaultMessageListenerContainer.java:1179)
    at org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.run(DefaultMessageListenerContainer.java:1076)
    at java.lang.Thread.run(Thread.java:748)
.
2019-12-08 19:39:50.310 ERROR 37524 --- [enerContainer-1] c.d.g.d.c.a.d.c.DDAMessageProcessor      : Error in consuming the message from sourceSystem com.mysample.consumer.exception.CustomKafkaPublicationException: Error publishing raw page with exception org.apache.kafka.common.errors.SerializationException: Error serializing Avro message
Caused by: org.apache.avro.AvroTypeException: Not an enum: null for schema: {"type":"enum","symbols":["TYPE"]}
    at org.apache.avro.generic.GenericDatumWriter.writeEnum(GenericDatumWriter.java:218)
    at org.apache.avro.specific.SpecificDatumWriter.writeEnum(SpecificDatumWriter.java:61)
    at org.apache.avro.generic.GenericDatumWriter.writeWithoutConversion(GenericDatumWriter.java:133)
    at org.apache.avro.specific.SpecificDatumWriter.writeField(SpecificDatumWriter.java:98)
    at org.apache.avro.generic.GenericDatumWriter.writeRecord(GenericDatumWriter.java:195)
    at org.apache.avro.specific.SpecificDatumWriter.writeRecord(SpecificDatumWriter.java:83)
    at org.apache.avro.generic.GenericDatumWriter.writeWithoutConversion(GenericDatumWriter.java:130)
    at org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:82)
    at org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:72)
    at io.confluent.kafka.serializers.AbstractKafkaAvroSerializer.serializeImpl(AbstractKafkaAvroSerializer.java:101)
    at io.confluent.kafka.serializers.KafkaAvroSerializer.serialize(KafkaAvroSerializer.java:53)
    at org.apache.kafka.common.serialization.ExtendedSerializer$Wrapper.serialize(ExtendedSerializer.java:65)
    at org.apache.kafka.common.serialization.ExtendedSerializer$Wrapper.serialize(ExtendedSerializer.java:55)
    at org.apache.kafka.clients.producer.KafkaProducer.doSend(KafkaProducer.java:841)
    at org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:803)
    at org.springframework.kafka.core.DefaultKafkaProducerFactory$CloseSafeProducer.send(DefaultKafkaProducerFactory.java:444)
    at org.springframework.kafka.core.KafkaTemplate.doSend(KafkaTemplate.java:381)
    at org.springframework.kafka.core.KafkaTemplate.send(KafkaTemplate.java:199)
    at com.mysample.consumer.DDAMessageProcessor.publish(DDAMessageProcessor.java:106)
    at com.mysample.consumer.DDAMessageProcessor.receiveMessage(DDAMessageProcessor.java:64)
    at sun.reflect.NativeMethodaccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodaccessorImpl.invoke(NativeMethodaccessorImpl.java:62)
    at sun.reflect.DelegatingMethodaccessorImpl.invoke(DelegatingMethodaccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.doInvoke(InvocableHandlerMethod.java:171)
    at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.invoke(InvocableHandlerMethod.java:120)
    at org.springframework.jms.listener.adapter.MessagingMessageListenerAdapter.invokeHandler(MessagingMessageListenerAdapter.java:114)
    at org.springframework.jms.listener.adapter.MessagingMessageListenerAdapter.onmessage(MessagingMessageListenerAdapter.java:77)
    at org.springframework.jms.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:736)
    at org.springframework.jms.listener.AbstractMessageListenerContainer.invokeListener(AbstractMessageListenerContainer.java:696)
    at org.springframework.jms.listener.AbstractMessageListenerContainer.doExecuteListener(AbstractMessageListenerContainer.java:674)
    at org.springframework.jms.listener.AbstractPollingMessageListenerContainer.doReceiveAndExecute(AbstractPollingMessageListenerContainer.java:318)
    at org.springframework.jms.listener.AbstractPollingMessageListenerContainer.receiveAndExecute(AbstractPollingMessageListenerContainer.java:245)
    at org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.invokeListener(DefaultMessageListenerContainer.java:1189)
    at org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.executeongoingLoop(DefaultMessageListenerContainer.java:1179)
    at org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.run(DefaultMessageListenerContainer.java:1076)
    at java.lang.Thread.run(Thread.java:748)
.
    at com.mysample.consumer.DDAMessageProcessor.publish(DDAMessageProcessor.java:111)
    at com.mysample.consumer.DDAMessageProcessor.receiveMessage(DDAMessageProcessor.java:64)
    at sun.reflect.NativeMethodaccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodaccessorImpl.invoke(NativeMethodaccessorImpl.java:62)
    at sun.reflect.DelegatingMethodaccessorImpl.invoke(DelegatingMethodaccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.doInvoke(InvocableHandlerMethod.java:171)
    at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.invoke(InvocableHandlerMethod.java:120)
    at org.springframework.jms.listener.adapter.MessagingMessageListenerAdapter.invokeHandler(MessagingMessageListenerAdapter.java:114)
    at org.springframework.jms.listener.adapter.MessagingMessageListenerAdapter.onmessage(MessagingMessageListenerAdapter.java:77)
    at org.springframework.jms.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:736)
    at org.springframework.jms.listener.AbstractMessageListenerContainer.invokeListener(AbstractMessageListenerContainer.java:696)
    at org.springframework.jms.listener.AbstractMessageListenerContainer.doExecuteListener(AbstractMessageListenerContainer.java:674)
    at org.springframework.jms.listener.AbstractPollingMessageListenerContainer.doReceiveAndExecute(AbstractPollingMessageListenerContainer.java:318)
    at org.springframework.jms.listener.AbstractPollingMessageListenerContainer.receiveAndExecute(AbstractPollingMessageListenerContainer.java:245)
    at org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.invokeListener(DefaultMessageListenerContainer.java:1189)
    at org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.executeongoingLoop(DefaultMessageListenerContainer.java:1179)
    at org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.run(DefaultMessageListenerContainer.java:1076)
    at java.lang.Thread.run(Thread.java:748)

2019-12-08 19:39:50.312 TRACE 37524 --- [enerContainer-1] .s.j.l.a.MessagingMessageListenerAdapter : No result object given - no result to handle
2019-12-08 19:39:50.312 TRACE 37524 --- [enerContainer-1] .s.t.s.TransactionSynchronizationmanager : Retrieved value [org.springframework.jms.connection.JmsResourceHolder@34a4adbe] for key [JMS_RESOURCE_KEY] bound to thread [DefaultMessageListenerContainer-1]
2019-12-08 19:39:50.316 DEBUG 37524 --- [enerContainer-1] o.a.k.c.p.internals.Transactionmanager   : [Producer clientId=producer-1,transactionalId=SAMP-CON-0] Transition from state IN_TRANSactION to COMMITTING_TRANSactION
2019-12-08 19:39:50.317 DEBUG 37524 --- [enerContainer-1] o.a.k.c.p.internals.Transactionmanager   : [Producer clientId=producer-1,transactionalId=SAMP-CON-0] Enqueuing transactional request (type=EndTxnRequest,transactionalId=SAMP-CON-0,producerId=435000,producerEpoch=39,result=COMMIT)
2019-12-08 19:39:50.317 DEBUG 37524 --- [ad | producer-1] o.a.k.c.p.internals.Transactionmanager   : [Producer clientId=producer-1,transactionalId=SAMP-CON-0] Not sending EndTxn for completed transaction since no partitions or offsets were successfully added
2019-12-08 19:39:50.317 TRACE 37524 --- [enerContainer-1] .s.t.s.TransactionSynchronizationmanager : Clearing transaction synchronization
2019-12-08 19:39:50.317 DEBUG 37524 --- [ad | producer-1] o.a.k.c.p.internals.Transactionmanager   : [Producer clientId=producer-1,transactionalId=SAMP-CON-0] Transition from state COMMITTING_TRANSactION to READY
2019-12-08 19:39:50.317 TRACE 37524 --- [enerContainer-1] .s.t.s.TransactionSynchronizationmanager : Removed value [org.springframework.kafka.core.KafkaResourceHolder@48a5eda2] for key [org.springframework.kafka.core.DefaultKafkaProducerFactory@64aad809] from thread [DefaultMessageListenerContainer-1]
2019-12-08 19:39:50.317 DEBUG 37524 --- [enerContainer-1] o.s.j.connection.JmsTransactionmanager   : Initiating transaction commit
2019-12-08 19:39:50.317 DEBUG 37524 --- [enerContainer-1] o.s.j.connection.JmsTransactionmanager   : Committing JMS transaction on Session [com.ibm.mq.jms.MQSession@1a7d298f]
2019-12-08 19:39:50.396 TRACE 37524 --- [enerContainer-1] .s.t.s.TransactionSynchronizationmanager : Removed value [org.springframework.jms.connection.JmsResourceHolder@34a4adbe] for key [JMS_RESOURCE_KEY] from thread [DefaultMessageListenerContainer-1]
2019-12-08 19:39:50.411 DEBUG 37524 --- [enerContainer-1] o.s.j.connection.JmsTransactionmanager   : Resuming suspended transaction after completion of inner transaction
2019-12-08 19:39:50.412 TRACE 37524 --- [enerContainer-1] .s.t.s.TransactionSynchronizationmanager : Initializing transaction synchronization
2019-12-08 19:39:50.412 TRACE 37524 --- [enerContainer-1] .s.t.s.TransactionSynchronizationmanager : Initializing transaction synchronization
2019-12-08 19:39:50.412 TRACE 37524 --- [enerContainer-1] .s.t.s.TransactionSynchronizationmanager : Clearing transaction synchronization
2019-12-08 19:39:50.412 DEBUG 37524 --- [enerContainer-1] o.s.j.connection.JmsTransactionmanager   : Creating new transaction with name [null]: PROPAGATION_REQUIRED,ISOLATION_DEFAULT

问题

Kakfa生产者不回滚事务,因此导致

  1. 目标队列中的数据不应该存在
  2. 源队列中不再有该消息
  3. kafka主题没有该消息。

尝试过的事情

  1. 在回滚中使用Throwable vs Exception

  2. 将要发布的代码放入队列和将主题放在单独的类中

我们如何测试适用于Kafka生产者的回滚方案?

teamoxiaoluo 回答:KafkaProducer和KafkaTransactionManager在异常上不回滚

at com.mysample.consumer.MyMessageProcessor$$FastClassBySpringCGLIB$$a0eef45f.invoke(<generated>)
at org.springframework.jms.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:736)

我在堆栈跟踪中没有看到事务拦截器,这意味着@Transactional不起作用-您需要在@EnableTransactionManagement类上使用@Configuration

但是,您实际上不需要@Transactional,只需将ChainedTransactionManager注入到侦听器容器工厂中,这样就可以在此处开始两个事务。

,

尝试使用ChainedKafkaTransactionManager并将所有使用的事务管理器连接到其中:

    @Bean
    public ChainedKafkaTransactionManager<String,String> chainedTransactionManager(DataSourceTransactionManager dataSourceTransactionManager,KafkaTransactionManager<String,String> kafkaTransactionManager) {
        return new ChainedKafkaTransactionManager<String,String>(kafkaTransactionManager,dataSourceTransactionManager);
    }

在我的情况下,DataSourceTransactionManager已由KafkaTransactionManager代替。因此@Transactional停止工作:RuntimeExceptions不再回滚。

ChainedTransactionManager按预期方式工作。请记住,这不是XA事务,而是一个简单的链接的1-阶段提交。

本文链接:https://www.f2er.com/2954035.html

大家都在问