如何在同一主题上使用globalKtable和StateStore?

请澄清一下,我是Kafka的新手,很抱歉,如果我的问题似乎没有记载,我正在阅读教程,文档以及我能理解的所有内容。

我试图从GlobalStore读取所有值以更新其值,然后使用已经存在的StateStore放置这些新的更新值。

我要这样做是因为

this.stateStore.all();

我只有1/10的数据,如果我理解正确的话,这是因为我有10个分区,而ss只读取一个(虽然我不完全理解为什么)

这是我的globalTable:

    public StreamsBuilder declareTopology(StreamsBuilder builder) {

        logger.debug("Building topology : input topic ~ {} ; output topics ~ {},{}",getInputTopic(),getDataTopic(),getToEsTopic());

        builder.globalTable(
                getDataTopic(),Consumed.with(Serdes.String(),fooSerdes)
                        .withOffsetResetPolicy(Topology.AutoOffsetReset.EARLIEST),Materialized.<String,Foo,KeyValueStore<Bytes,byte[]>>as(
                        "foosktable")
                        .withKeySerde(Serdes.String())
                        .withValueSerde(fooSerdes)
                        .withLoggingEnabled(new HashMap<>()));
    ...

这是addStateStore,我无法删除它,因为它已在代码的其他地方使用:

       ...

       builder.addStateStore(
            Stores.keyValueStoreBuilder(
                    Stores.persistentKeyValueStore("foosktable"),Serdes.String(),fooSerdes));
    ...

    return builder;
}

因此,从理论上讲,我当时正在考虑的是删除同样使用同一主题的StateStore,并使用我的data.process主题之一放置数据,问题是该处理器对此做了其他事情StateStore,所以我不能uke它。

我在这里迷路了,任何光线都会帮上大忙。谢谢!

da4321 回答:如何在同一主题上使用globalKtable和StateStore?

目前尚不清楚您实际要达到的目标。但是,有一些高级解释:

GlobalKTable仅具有一个目的:从主题中读取数据而无需修改即可进行KStream-GlobalKTable-join或通过“交互式查询”查询商店。

因此,您无法真正做您想做的事,因为无法按照您的意图将数据从全局存储复制到另一个存储。您将需要复制输入主题并阅读两次:(1)以GlobalKTable的身份和(2)以常规KStream的格式来修改数据,然后再将其放入存储中。对于(2),您可以使用transform()

希望这会有所帮助。

本文链接:https://www.f2er.com/3169166.html

大家都在问