使用 Kafka Binder 在 Spring 云中打印 JsonObject

我是 Spring Cloud 和 kafka 流的新手。我正在尝试使用 kafka 活页夹设置 spring 云应用程序。我尝试在本地测试 kafka 流处理器,但无法打印任何日志。

我的 kafka 消息将包含 JSONObject。 kafkaStreamListener 类是:

@Configuration
public class KafkaStreamListener {

    private static Logger logger = LogManager.getLogger(KafkaStreamListener.class);
    
    //bean for processing autonomous messages 
     @Bean
      public Function<KStream<String,JSONObject>,KStream<String,JSONObject>> autonomousProcessor() {
         System.out.println("start of stream processor%%%%%%%%%%%%%%%%%%%%%**************************");
         logger.info("inside processor");
         return kstream -> kstream.filter((key,value) -> {
         System.out.println(value.toString()); 
         return true;});
             }

应用程序属性:

#Processor group with inputs and outputs
spring.cloud.stream.function.definition = autonomousProcessor
spring.cloud.stream.bindings.autonomousProcessor-in-0.destination = INPUT_TOPIC
spring.cloud.stream.bindings.autonomousProcessor-out-0.destination = OUTPUT_TOPIC
spring.cloud.stream.kafka.streams.binder.functions.autonomousProcessor.application-id= autonomousProcessorGroup

问题: 在调试模式下,断点直接到达过滤步骤而不是不动作。它跳过记录器和 SOP。不知道可能是什么问题。 Spring 云版本:Hoxton.SR11

chen362015 回答:使用 Kafka Binder 在 Spring 云中打印 JsonObject

我认为您所看到的是正确的行为。您的函数将在引导时由绑定器调用一次,然后将调用初始 SOP 和记录器(再次仅调用一次)。如果您在启动应用程序时在第一个 SOP 或记录器上设置断点,您将看到它们被调用。然后当 Kafka 主题接收到数据时,将调用提供的 lambda(带过滤器)。过滤器中的内部 SOP 应在每次调用文件管理器时记录 value.toString() 每条记录。

本文链接:https://www.f2er.com/34163.html

大家都在问