我正在窗口中查看流环境中的传入对象,并进行收集和打印。 使用kafka抑制可避免出现中间结果。
使用抑制后,我将无法输出任何信息。如果我注释掉抑制代码,则代码可以正常工作,但会输出中间结果。
import com.savk.workout.kafka.streams.kafkastreamsworkout.config.ConfigUtils;
import com.savk.workout.kafka.streams.kafkastreamsworkout.model.Observation;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.kstream.Suppressed;
import org.apache.kafka.streams.kstream.TimeWindows;
import org.springframework.kafka.support.serializer.JsonSerde;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
import java.time.Duration;
import java.util.Properties;
@Component
public class ObservationAnalyser {
@PostConstruct
public void initialize() {
StreamsBuilder streamsBuilder;
KStream<String,Observation> observationKStream;
String observationSerde = ConfigUtils.getObservationSerde(); //TODO : Should we move to a JSON Serde?
Properties kafkaStreamProperties = ConfigUtils.getKafkaStreamConfig();
kafkaStreamProperties.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG,Serdes.String().getclass().getName());
kafkaStreamProperties.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG,observationSerde);
//JsonSerde<ObservationCollector> observationCollectorJsonSerde = new JsonSerde<>(ObservationCollector.class);
streamsBuilder = new StreamsBuilder();
observationKStream = streamsBuilder.stream(ConfigUtils.KAFKA_SOURCE_TOPIC);
observationKStream
.groupByKey()
.windowedBy(TimeWindows.of(Duration.ofMillis(ConfigUtils.ONE_MINUTE_IN_MILLISECONDS)))
.aggregate(
() -> new ObservationCollector(),(key,value,observationCollector) -> observationCollector.addObservations(value),Materialized.with(Serdes.String(),new JsonSerde<>(ObservationCollector.class))
)
.suppress(Suppressed.untilWindowCloses(Suppressed.BufferConfig.unbounded())) //AFTER COMMENTING THIS LINE,I CAN SEE THE OUTPUT
.toStream((key,value) -> key.key())
.foreach((key,observationCollector) -> {
System.out.println("Key :: " + key);
for(Observation observation : observationCollector.getObservations()) {
System.out.println("Observation :: " + observation);
}
});
KafkaStreams kafkaStreams = new KafkaStreams(streamsBuilder.build(),kafkaStreamProperties);
kafkaStreams.start();
Runtime.getRuntime().addShutdownHook(new Thread(kafkaStreams::close));
}
}
我无法弄清楚是什么问题/问题或找不到任何资源来进行故障排除。