我阅读了大量的Gary Russell的答案和帖子,但没有找到用于同步以下序列的常见用例的实际解决方案:
recieve from topic A => save to DB via Spring-data => send to topic B
据我正确理解:在这种情况下,不能保证完全原子处理,并且我需要在客户端处理消息重复数据删除,但是主要问题是 ChainedKafkaTransactionmanager与JpaTransactionmanager不同步(请参阅下面的@KafkaListener
)
Kafka配置:
@Production
@EnableKafka
@Configuration
@EnableTransactionmanagement
public class KafkaConfig {
private static final Logger log = LoggerFactory.getLogger(KafkaConfig.class);
@Bean
public ConsumerFactory<String,byte[]> commonConsumerFactory(@Value("${kafka.broker}") String bootstrapServer) {
Map<String,Object> props = new HashMap<>();
props.put(BOOTSTRAP_SERVERS_CONFIG,bootstrapServer);
props.put(AUTO_OFFSET_RESET_CONFIG,'earliest');
props.put(SESSION_TIMEOUT_MS_CONFIG,10000);
props.put(ENABLE_AUTO_COMMIT_CONFIG,false);
props.put(MAX_POLL_RECORDS_CONFIG,10);
props.put(MAX_POLL_INTERVAL_MS_CONFIG,17000);
props.put(FETCH_MIN_BYTES_CONFIG,1048576);
props.put(FETCH_MAX_WAIT_MS_CONFIG,1000);
props.put(ISOLATION_LEVEL_CONFIG,'read_committed');
props.put(KEY_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class);
props.put(VALUE_DESERIALIZER_CLASS_CONFIG,ByteArrayDeserializer.class);
return new DefaultKafkaConsumerFactory<>(props);
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String,byte[]> kafkaListenerContainerFactory(
@Qualifier("commonConsumerFactory") ConsumerFactory<String,byte[]> consumerFactory,@Qualifier("chainedKafkaTM") ChainedKafkaTransactionmanager chainedKafkaTM,@Qualifier("kafkaTemplate") KafkaTemplate<String,byte[]> kafkaTemplate,@Value("${kafka.concurrency:#{T(java.lang.Runtime).getRuntime().availableProcessors()}}") Integer concurrency
) {
ConcurrentKafkaListenerContainerFactory<String,byte[]> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.getcontainerProperties().setMissingTopicsFatal(false);
factory.getcontainerProperties().setTransactionmanager(chainedKafkaTM);
factory.setConsumerFactory(consumerFactory);
factory.setBatchListener(true);
var arbp = new DefaultAfterRollbackProcessor<String,byte[]>(new FixedBackOff(1000L,3));
arbp.setCommitRecovered(true);
arbp.setKafkaTemplate(kafkaTemplate);
factory.setafterRollbackProcessor(arbp);
factory.setConcurrency(concurrency);
factory.afterPropertiesSet();
return factory;
}
@Bean
public ProducerFactory<String,byte[]> producerFactory(@Value("${kafka.broker}") String bootstrapServer) {
Map<String,Object> configProps = new HashMap<>();
configProps.put(BOOTSTRAP_SERVERS_CONFIG,bootstrapServer);
configProps.put(BATCH_SIZE_CONFIG,16384);
configProps.put(ENABLE_IDEMPOTENCE_CONFIG,true);
configProps.put(KEY_SERIALIZER_CLASS_CONFIG,StringSerializer.class);
configProps.put(VALUE_SERIALIZER_CLASS_CONFIG,ByteArraySerializer.class);
var kafkaProducerFactory = new DefaultKafkaProducerFactory<String,byte[]>(configProps);
kafkaProducerFactory.setTransactionIdPrefix('kafka-tx-');
return kafkaProducerFactory;
}
@Bean
public KafkaTemplate<String,byte[]> kafkaTemplate(@Qualifier("producerFactory") ProducerFactory<String,byte[]> producerFactory) {
return new KafkaTemplate<>(producerFactory);
}
@Bean
public KafkaTransactionmanager kafkaTransactionmanager(@Qualifier("producerFactory") ProducerFactory<String,byte[]> producerFactory) {
KafkaTransactionmanager ktm = new KafkaTransactionmanager<>(producerFactory);
ktm.setTransactionSynchronization(SYNCHRONIZATION_ON_actUAL_TRANSactION);
return ktm;
}
@Bean
public ChainedKafkaTransactionmanager chainedKafkaTM(JpaTransactionmanager jpaTransactionmanager,KafkaTransactionmanager kafkaTransactionmanager) {
return new ChainedKafkaTransactionmanager(kafkaTransactionmanager,jpaTransactionmanager);
}
@Bean(name = "transactionmanager")
public JpaTransactionmanager transactionmanager(EntityManagerFactory em) {
return new JpaTransactionmanager(em);
}
}
Kafka监听器:
@KafkaListener(groupId = "${group.id}",idIsGroup = false,topics = "${topic.name.import}")
public void consume(List<byte[]> records,@Header(KafkaHeaders.OFFSET) Long offset) {
for (byte[] record : records) {
// cause infinity rollback (perhaps due to batch listener)
if (true)
throw new RuntimeExcetion("foo");
// spring-data storage with @Transactional("chainedKafkaTM"),since Spring-data can't determine TM among transactionmanager,chainedKafkaTM,kafkaTransactionmanager
var result = storageService.persist(record);
kafkaTemplate.send(result);
}
}
Spring-kafka版本:2.3.3 春季启动版本:2.2.1
实现这种用例的正确方法是什么? Spring-kafka文档仅限于小型/特定示例。
Ps。。当我在@Transactional(transactionmanager = "chainedKafkaTM",rollbackFor = Exception.class)
方法上使用@KafkaListener
时,我面临无尽的循环回滚,但是设置了FixedBackOff(1000L,3L)
。
编辑:我正计划通过可配置的重试次数来实现侦听器,生产者和数据库之间的最大负担得起的同步。
编辑:上面的代码段根据建议的配置进行了编辑。使用ARBP不能解决我的无限回滚周期,因为第一条语句的谓词始终为false(SeekUtils.doSeeks
):
DefaultAfterRollbackProcessor
...
@Override
public void process(List<ConsumerRecord<K,V>> records,Consumer<K,V> consumer,Exception exception,boolean recoverable) {
if (SeekUtils.doSeeks(((List) records),consumer,exception,recoverable,getSkipPredicate((List) records,exception),LOGGER)
&& isCommitRecovered() && this.kafkaTemplate != null && this.kafkaTemplate.isTransactional()) {
ConsumerRecord<K,V> skipped = records.get(0);
this.kafkaTemplate.sendOffsetsToTransaction(
Collections.singletonmap(new TopicPartition(skipped.topic(),skipped.partition()),new OffsetandMetadata(skipped.offset() + 1)));
}
}
值得一提的是,Kafka Consumer方法(TransactionSynchronizationmanager.isactualTransactionactive()
)中没有活动交易。