请澄清一下,我是Kafka的新手,很抱歉,如果我的问题似乎没有记载,我正在阅读教程,文档以及我能理解的所有内容。
我试图从GlobalStore读取所有值以更新其值,然后使用已经存在的StateStore放置这些新的更新值。
我要这样做是因为
this.stateStore.all();
我只有1/10的数据,如果我理解正确的话,这是因为我有10个分区,而ss只读取一个(虽然我不完全理解为什么)
这是我的globalTable:
public StreamsBuilder declareTopology(StreamsBuilder builder) {
logger.debug("Building topology : input topic ~ {} ; output topics ~ {},{}",getInputTopic(),getDataTopic(),getToEsTopic());
builder.globalTable(
getDataTopic(),Consumed.with(Serdes.String(),fooSerdes)
.withOffsetResetPolicy(Topology.AutoOffsetReset.EARLIEST),Materialized.<String,Foo,KeyValueStore<Bytes,byte[]>>as(
"foosktable")
.withKeySerde(Serdes.String())
.withValueSerde(fooSerdes)
.withLoggingEnabled(new HashMap<>()));
...
这是addStateStore,我无法删除它,因为它已在代码的其他地方使用:
...
builder.addStateStore(
Stores.keyValueStoreBuilder(
Stores.persistentKeyValueStore("foosktable"),Serdes.String(),fooSerdes));
...
return builder;
}
因此,从理论上讲,我当时正在考虑的是删除同样使用同一主题的StateStore,并使用我的data.process主题之一放置数据,问题是该处理器对此做了其他事情StateStore,所以我不能uke它。
我在这里迷路了,任何光线都会帮上大忙。谢谢!