在Kafka的配置下找到服务器 @豆角,扁豆 public ConsumerFactory ConsumerFactory(){ 地图配置= new HashMap ();
config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"127.0.0.1:9092");
config.put(ConsumerConfig.GROUP_ID_CONFIG,"group_id");
config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class);
config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class);
return new DefaultKafkaConsumerFactory<>(config);
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String,String> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String,String> factory = new
ConcurrentKafkaListenerContainerFactory();
factory.setConsumerFactory(consumerFactory());
return factory;
}
here I need to change the topic as a dynamic content
@KafkaListener(topics = "mytopic")
private void consume2(String Data) {
String userID = null;
String topic = null;
String message = null;
System.out.println(Data);
}