我是 Spring Cloud 和 kafka 流的新手。我正在尝试使用 kafka 活页夹设置 spring 云应用程序。我尝试在本地测试 kafka 流处理器,但无法打印任何日志。
我的 kafka 消息将包含 JSONObject。 kafkaStreamListener 类是:
@Configuration
public class KafkaStreamListener {
private static Logger logger = LogManager.getLogger(KafkaStreamListener.class);
//bean for processing autonomous messages
@Bean
public Function<KStream<String,JSONObject>,KStream<String,JSONObject>> autonomousProcessor() {
System.out.println("start of stream processor%%%%%%%%%%%%%%%%%%%%%**************************");
logger.info("inside processor");
return kstream -> kstream.filter((key,value) -> {
System.out.println(value.toString());
return true;});
}
应用程序属性:
#Processor group with inputs and outputs
spring.cloud.stream.function.definition = autonomousProcessor
spring.cloud.stream.bindings.autonomousProcessor-in-0.destination = INPUT_TOPIC
spring.cloud.stream.bindings.autonomousProcessor-out-0.destination = OUTPUT_TOPIC
spring.cloud.stream.kafka.streams.binder.functions.autonomousProcessor.application-id= autonomousProcessorGroup
问题: 在调试模式下,断点直接到达过滤步骤而不是不动作。它跳过记录器和 SOP。不知道可能是什么问题。 Spring 云版本:Hoxton.SR11