我如何为Kafka Consumer使用Dynamic主题

在Kafka的配置下找到服务器     @豆角,扁豆     public ConsumerFactory ConsumerFactory(){         地图配置= new HashMap ();

    config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"127.0.0.1:9092");
    config.put(ConsumerConfig.GROUP_ID_CONFIG,"group_id");
    config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class);
    config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class);

    return new DefaultKafkaConsumerFactory<>(config);
}


@Bean
public ConcurrentKafkaListenerContainerFactory<String,String> kafkaListenerContainerFactory() {
    ConcurrentKafkaListenerContainerFactory<String,String> factory = new 
    ConcurrentKafkaListenerContainerFactory();
    factory.setConsumerFactory(consumerFactory());
    return factory;
}



here I need to change the topic as a dynamic content
@KafkaListener(topics = "mytopic")
private void consume2(String Data) {
    String userID = null;
    String topic = null;
    String message = null;
    System.out.println(Data);
}
wkz123456 回答:我如何为Kafka Consumer使用Dynamic主题

您尝试过类似的事情

@KafkaListener(topics =“ $ {kafka.topics}”)

并将其设置为环境变量或应用程序属性吗?

环境变量KAFKA_TOPICS = mytopic application.properties kafka.topics = mytopic

本文链接:https://www.f2er.com/3151465.html

大家都在问