-
如何使用Kafka窗口(按时间)将kstream分成多个部分并从最后一个窗口获取记录?
我设法从KStream中获取所有记录,但是我只需要最后一个窗口中的记录。 请给我一个代码示例。 -
Kafka流读取和写入到单独的集群
以前已经回答过类似的问题,但是该解决方案不适用于我的用例。 我们在2个独立的DC中分别运行2 -
Kafka流-尝试从流中过滤数据并写入另一个主题。但是返回空
countInput是此处的KStream。 <pre><code>countInput.print(Printed.toSysOut()); </code></pre> 我得到如下输出 <pre -
卡夫卡流中幂等性与正当性之间的区别
我正在查看文档,了解到我们可以启用<code>idempotence=true</code> 来完成一次准确的交易 <blockquote> -
怀疑NamedCache size()实现会降低性能
我们在应用程序中将Kafka Streams用作流处理引擎。我们的拓扑之一是汇总来自3千万个设备的数据。当应用 -
通过同一应用程序的多个kafka流实例读取规则集主题/分区
我有一个Kafka Stream应用,该应用在主事件主题中进行一些处理,并且还有一个副主题 用于将规则集应用 -
Spring和Kafka流-如何使用查询API
我是kafka和kafka信息流的新手。我有与kafka生产者,消费者,KStream和KTable一起使用的基本Spring服务。现在 -
Kafka Streams并行处理不起作用
我正在学习Kafka Stream,我想构建一个简单的应用程序,该应用程序可以从一个主题读取文本行,并将出 -
来自Kafka流的context()。headers()修改期间的异常使调用点
我有一个使用处理器API的kafka流应用程序。我有一个基于挂钟的标点符号,可以检查本地状态存储中的陈 -
无法删除版本2.3.1
似乎<a href="https://stackoverflow.com/questions/52476045/kafka-streaming-java-nio-file-directorynotemptyexception">kafka streaming : j -
使用Kafka Stream中的State-store(RocksDB)将一条记录转换为多条记录
我想使用状态存储(RocksDB)将一条记录转换为一条以上的记录。我知道有一种方法,例如stream.transform( -
kafka流跳窗口聚合导致时间戳为零的多个窗口
Kafka Streams DSL窗口聚合导致多个窗口。 <pre><code>@StreamListener("input") public void process(KStream<S -
Kafka Streams任务分配
我有一个运行有一个线程的Kafka-Streams应用程序,可以很好地处理一个分区的主题。 我需要运行此 -
在带有Kafka流的窗口中处理项目
我正在尝试使用kafka流在滑动窗口中处理一些事件,但我认为我不了解kafka流的一些详细信息,因此我无 -
按实体删除KTable中的数据
我的任务是通过实体和引用删除KTable中的数据。我正在考虑使用逻辑删除技术在KTable中搜索给定的实体 -
反复更改Kafka Stream SessionWindows的持续时间
假设我有一个带有SessioWindowing的Kafka流,例如: <pre><code>windowedBy(SessionWindows.with(inactivity_time).until(aWee -
根据部分数据属性更新KTable
我正在尝试使用对象的部分数据更新KTable。 例如。用户对象是 <code>{"id":1, "name":"Joe", & -
目前在Kafka中处理具有依赖项的数据时的最佳做法是什么?
我们正在开发一个应用程序,该应用程序将从不同来源获取数据,一旦获得数据,我们便对其进行处理 -
阅读来自Kafka Streams的Peek主题
我有一个主题名称,它是push-processing-KSTREAM-PEEK-0000000014-repartition,这是kafka的内部主题。我没有创建此top -
从头开始阅读有关Kafka流的内部主题
我只想知道我是否可以阅读kafka流内部主题,例如-repartitions或-peek?我可以说主题读xxxx并查看当时的所 -
卡夫卡中的分区选择
我很好奇,如果我有主题A和B,它们具有相同的分区数,那么如果我向主题A发送带有密钥<code>x</code>的消 -
删除下游ChangeLog对象KafkaStreams
我正在尝试删除值为null的记录,在下游的变更日志中,我知道在状态存储中它们只是被null删除了(逻辑 -
消耗多个Kafka主题
我想编写一个Kafka应用程序,该应用程序从主题中使用并将某些内容保存在数据库中。主题由Debezium Kafka -
将有关单个Kafka主题的消息分发给特定消费者
有关单个Kafka主题,单个分区的Avro编码消息。这些消息中的每一个仅由特定使用者使用。例如,关于该 -
Kafka Streams API如何从Schema Registry获得正确的架构?
我尝试了解Kafka Streams API如何与Schema Registry一起使用。 我知道您在设置应用程序时必须指定架构 -
Kafka流用例以添加全局存储
在kafka流中定义拓扑时,可以添加全局状态存储。它需要一个源主题以及一个<code>ProcessorSupplier</code>。 -
春季云流融合KStream Avro消费 更新有关空Organization域对象的问题
我正在尝试使用Spring Boot 2.0将来自kafka主题的融合avro消息作为Kstream来使用。 我可以将消息作为<cod -
Kafka和Kafka Streams:如何在多个租户之间实现事件的公平处理?
就多个租户中给定租户处理的事件数而言,我们如何实现公平的处理保证? 这是用例: 在我们的 -
如何防止Kafka流因处理异常而死亡?
我目前正在使用Kafka流进行POC,在此期间我观察到任何记录处理错误都会关闭kafka流, 我已经通读了Deseri -
Kafka Streams-用于基于日期的过滤的用法
我想考虑在以下情况下在基于Java的Spring服务中使用Kafka和Kafka Streams: <ol> <li>事件始终发布到系统中