Spring Kafka ChainedKafkaTransactionManager不与JPA Spring数据事务同步

我阅读了大量的Gary Russell的答案和帖子,但没有找到用于同步以下序列的常见用例的实际解决方案:

recieve from topic A => save to DB via Spring-data => send to topic B

据我正确理解:在这种情况下,不能保证完全原子处理,并且我需要在客户端处理消息重复数据删除,但是主要问题是 ChainedKafkaTransactionmanager与JpaTransactionmanager不同步(请参阅下面的@KafkaListener

Kafka配置:

@Production
@EnableKafka
@Configuration
@EnableTransactionmanagement
public class KafkaConfig {

    private static final Logger log = LoggerFactory.getLogger(KafkaConfig.class);

    @Bean
    public ConsumerFactory<String,byte[]> commonConsumerFactory(@Value("${kafka.broker}") String bootstrapServer) {

        Map<String,Object> props = new HashMap<>();
        props.put(BOOTSTRAP_SERVERS_CONFIG,bootstrapServer);

        props.put(AUTO_OFFSET_RESET_CONFIG,'earliest');
        props.put(SESSION_TIMEOUT_MS_CONFIG,10000);
        props.put(ENABLE_AUTO_COMMIT_CONFIG,false);
        props.put(MAX_POLL_RECORDS_CONFIG,10);
        props.put(MAX_POLL_INTERVAL_MS_CONFIG,17000);
        props.put(FETCH_MIN_BYTES_CONFIG,1048576);
        props.put(FETCH_MAX_WAIT_MS_CONFIG,1000);
        props.put(ISOLATION_LEVEL_CONFIG,'read_committed');

        props.put(KEY_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class);
        props.put(VALUE_DESERIALIZER_CLASS_CONFIG,ByteArrayDeserializer.class);

        return new DefaultKafkaConsumerFactory<>(props);
    }

    @Bean
    public ConcurrentKafkaListenerContainerFactory<String,byte[]> kafkaListenerContainerFactory(
            @Qualifier("commonConsumerFactory") ConsumerFactory<String,byte[]> consumerFactory,@Qualifier("chainedKafkaTM") ChainedKafkaTransactionmanager chainedKafkaTM,@Qualifier("kafkaTemplate") KafkaTemplate<String,byte[]> kafkaTemplate,@Value("${kafka.concurrency:#{T(java.lang.Runtime).getRuntime().availableProcessors()}}") Integer concurrency
    ) {

        ConcurrentKafkaListenerContainerFactory<String,byte[]> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.getcontainerProperties().setMissingTopicsFatal(false);
        factory.getcontainerProperties().setTransactionmanager(chainedKafkaTM);

        factory.setConsumerFactory(consumerFactory);
        factory.setBatchListener(true);
        var arbp = new DefaultAfterRollbackProcessor<String,byte[]>(new FixedBackOff(1000L,3));
        arbp.setCommitRecovered(true);
        arbp.setKafkaTemplate(kafkaTemplate);

        factory.setafterRollbackProcessor(arbp);
        factory.setConcurrency(concurrency);

        factory.afterPropertiesSet();

        return factory;
    }

    @Bean
    public ProducerFactory<String,byte[]> producerFactory(@Value("${kafka.broker}") String bootstrapServer) {

        Map<String,Object> configProps = new HashMap<>();

        configProps.put(BOOTSTRAP_SERVERS_CONFIG,bootstrapServer);

        configProps.put(BATCH_SIZE_CONFIG,16384);
        configProps.put(ENABLE_IDEMPOTENCE_CONFIG,true);

        configProps.put(KEY_SERIALIZER_CLASS_CONFIG,StringSerializer.class);
        configProps.put(VALUE_SERIALIZER_CLASS_CONFIG,ByteArraySerializer.class);

        var kafkaProducerFactory = new DefaultKafkaProducerFactory<String,byte[]>(configProps);
        kafkaProducerFactory.setTransactionIdPrefix('kafka-tx-');

        return kafkaProducerFactory;
    }

    @Bean
    public KafkaTemplate<String,byte[]> kafkaTemplate(@Qualifier("producerFactory") ProducerFactory<String,byte[]> producerFactory) {
        return new KafkaTemplate<>(producerFactory);
    }

    @Bean
    public KafkaTransactionmanager kafkaTransactionmanager(@Qualifier("producerFactory") ProducerFactory<String,byte[]> producerFactory) {
        KafkaTransactionmanager ktm = new KafkaTransactionmanager<>(producerFactory);
        ktm.setTransactionSynchronization(SYNCHRONIZATION_ON_actUAL_TRANSactION);
        return ktm;
    }

    @Bean
    public ChainedKafkaTransactionmanager chainedKafkaTM(JpaTransactionmanager jpaTransactionmanager,KafkaTransactionmanager kafkaTransactionmanager) {
        return new ChainedKafkaTransactionmanager(kafkaTransactionmanager,jpaTransactionmanager);
    }

    @Bean(name = "transactionmanager")
    public JpaTransactionmanager transactionmanager(EntityManagerFactory em) {
        return new JpaTransactionmanager(em);
    }
}

Kafka监听器:

@KafkaListener(groupId = "${group.id}",idIsGroup = false,topics = "${topic.name.import}")
public void consume(List<byte[]> records,@Header(KafkaHeaders.OFFSET) Long offset) {
    for (byte[] record : records) {
        // cause infinity rollback (perhaps due to batch listener)
        if (true)
            throw new RuntimeExcetion("foo");

        // spring-data storage with @Transactional("chainedKafkaTM"),since Spring-data can't determine TM among transactionmanager,chainedKafkaTM,kafkaTransactionmanager
        var result = storageService.persist(record);

        kafkaTemplate.send(result);
    }
}

Spring-kafka版本:2.3.3 春季启动版本:2.2.1

实现这种用例的正确方法是什么? Spring-kafka文档仅限于小型/特定示例。

Ps。。当我在@Transactional(transactionmanager = "chainedKafkaTM",rollbackFor = Exception.class)方法上使用@KafkaListener时,我面临无尽的循环回滚,但是设置了FixedBackOff(1000L,3L)

编辑:我正计划通过可配置的重试次数来实现侦听器,生产者和数据库之间的最大负担得起的同步。

编辑:上面的代码段根据建议的配置进行了编辑。使用ARBP不能解决我的无限回滚周期,因为第一条语句的谓词始终为false(SeekUtils.doSeeks):

DefaultAfterRollbackProcessor
...
@Override
    public void process(List<ConsumerRecord<K,V>> records,Consumer<K,V> consumer,Exception exception,boolean recoverable) {

        if (SeekUtils.doSeeks(((List) records),consumer,exception,recoverable,getSkipPredicate((List) records,exception),LOGGER)
                    && isCommitRecovered() && this.kafkaTemplate != null && this.kafkaTemplate.isTransactional()) {
            ConsumerRecord<K,V> skipped = records.get(0);
            this.kafkaTemplate.sendOffsetsToTransaction(
                    Collections.singletonmap(new TopicPartition(skipped.topic(),skipped.partition()),new OffsetandMetadata(skipped.offset() + 1)));
        }
    }

值得一提的是,Kafka Consumer方法(TransactionSynchronizationmanager.isactualTransactionactive())中没有活动交易。

dsfvdkjdfg4ah94njknh 回答:Spring Kafka ChainedKafkaTransactionManager不与JPA Spring数据事务同步

是什么让您认为它不同步?您真的不需要@Transactional,因为容器将同时启动这两个事务。

您不应在交易中使用SeekToCurrentErrorHandler,因为这发生在交易中。而是配置后回滚处理器。默认的ARBP使用FixedBackOff(0L,9)(10次尝试)。

这对我来说很好;并在尝试4次投放后停止:

@SpringBootApplication
public class So58804826Application {

    public static void main(String[] args) {
        SpringApplication.run(So58804826Application.class,args);
    }

    @Bean
    public JpaTransactionManager transactionManager() {
        return new JpaTransactionManager();
    }


    @Bean
    public ChainedKafkaTransactionManager<?,?> chainedTxM(JpaTransactionManager jpa,KafkaTransactionManager<?,?> kafka) {

        kafka.setTransactionSynchronization(SYNCHRONIZATION_ON_ACTUAL_TRANSACTION);
        return new ChainedKafkaTransactionManager<>(kafka,jpa);
    }

    @Autowired
    private Saver saver;

    @KafkaListener(id = "so58804826",topics = "so58804826")
    public void listen(String in) {
        System.out.println("Storing: " + in);
        this.saver.save(in);
    }

    @Bean
    public NewTopic topic() {
        return TopicBuilder.name("so58804826")
                .partitions(1)
                .replicas(1)
                .build();
    }

    @Bean
    public ApplicationRunner runner(KafkaTemplate<String,String> template) {
        return args -> {
//          template.executeInTransaction(t -> t.send("so58804826","foo"));
        };
    }

}

@Component
class ContainerFactoryConfigurer {

    ContainerFactoryConfigurer(ConcurrentKafkaListenerContainerFactory<?,?> factory,ChainedKafkaTransactionManager<?,?> tm) {

        factory.getContainerProperties().setTransactionManager(tm);
        factory.setAfterRollbackProcessor(new DefaultAfterRollbackProcessor<>(new FixedBackOff(1000L,3)));
    }

}

@Component
class Saver {

    @Autowired
    private MyEntityRepo repo;

    private final AtomicInteger ids = new AtomicInteger();

    @Transactional("chainedTxM")
    public void save(String in) {
        this.repo.save(new MyEntity(in,this.ids.incrementAndGet()));
        throw new RuntimeException("foo");
    }

}

我在两个TxM中都看到了“参与现有交易”。

@Transactional("transactionManager")一样,我只是从JPATm获得了它。

编辑

对于批处理侦听器,没有“恢复”的概念-框架不知道需要跳过批处理中的哪个记录。在2.3中,我们为使用MANUAL ack模式的批处理侦听器添加了一项新功能。

请参见Committing Offsets

  

从2.3版开始,Acknowledgment接口具有两个附加方法nack(长时间睡眠)和nack(int索引,长时间睡眠)。第一个与记录侦听器一起使用,第二个与批处理侦听器一起使用。为您的侦听器类型调用错误的方法将引发IllegalStateException。

     

使用批处理侦听器时,可以在发生故障的批处理中指定索引。调用nack()时,将在对失败和废弃记录的分区执行索引和查找之前为记录提交偏移量,以便在下一个poll()中重新传递它们。这是对SeekToCurrentBatchErrorHandler的改进,后者只能搜索整个批次以进行重新交付。

但是,失败的记录仍将无限期重放。

您可以跟踪不断失败的记录,并按index + 1跳过它。

但是,由于您的JPA tx已回退;这对您不起作用。

使用批处理侦听器,您必须在侦听器代码中处理批处理问题。

本文链接:https://www.f2er.com/3122907.html

大家都在问