我对此进行了大量研究,但仍然无法找到合适的方法。无论走到哪里,我都发现最简单的方法是调用saveToEs()
,然后再提交偏移量。我的问题是,saveToEs()
是否由于某种原因而失败怎么办?
当我们使用Spark流作业并将文档存储在ES中时,在Kafka中存储偏移量的正确方法是什么。我尝试使用BulkProcessorListener
并手动存储偏移量(跟踪已排序的偏移量和请求以及其他内容),但是这种方法失控了,对于这种一般任务而言,方法似乎很复杂。
有人可以引导我吗?
任何对我的方法感兴趣的人,这是解释它的问题 Commit Offsets to Kafka on Spark Executors