Apache Beam KafkaIO BatchMode OOM问题

我有一个用例,我想使用Kafka和Sparkrunner以批处理模式从Apache Beam读取数据。

使用withMaxNumRecords(long)类的KafkaIO方法,可以从BoundedReader制作UnboundedReader。但是我发现,在批处理模式下,首先要从每个分区读取数据,然后将数据放入内存中,然后传递给下一个操作(映射,过滤器等)。

每个分区中都有大量数据,以批处理模式读取此数据时,出现OOM错误。我试图增加执行程序的内存。但是对于每次运行,我都无法使用所需的值配置此参数。 另一件事是,我能够以流模式读取相同的数据。

我认为这种情况正在发生,因为在批处理模式下,每个分区中的所有记录都分配给GlobalWindow(ProcessContext的一部分),仅读取所有数据才触发该记录。这可能是由于OOM问题引起的。

如果这是原因,那么如何在ProcessContext中将GlobalWindow更改为PartitioningWindow

如果这不是原因,那么如何在不增加每次执行的执行器内存的情况下,使用Apache Beam以批量模式从Kafka读取大量数据?

lz0738 回答:Apache Beam KafkaIO BatchMode OOM问题

从文档中

You can use windowing with fixed-size data sets in bounded PCollections. However,note that windowing considers only the implicit timestamps attached to each element of a PCollection,and data sources that create fixed data sets (such as TextIO) assign the same timestamp to every element. This means that all the elements are by default part of a single,global window.

To use windowing with fixed data sets,you can assign your own timestamps to each element. To assign timestamps to elements,use a ParDo transform with a DoFn that outputs each element with a new timestamp (for example,the WithTimestamps transform in the Beam SDK for Java).

以下是如何为边界数据集定义窗口的示例-https://beam.apache.org/get-started/wordcount-example/#unbounded-and-bounded-datasets

还从文档中一次读取了有界数据集,并按如下所述分配了全局窗口

The bounded (or unbounded) nature of your PCollection affects how Beam processes your data. A bounded PCollection can be processed using a batch job,which might read the entire data set once,and perform processing in a job of finite length. An unbounded PCollection must be processed using a streaming job that runs continuously,as the entire collection can never be available for processing at any one time.

我相信要解决您的问题,您可以尝试为有界集合设置一个窗口,然后尝试在Cloud Dataflow上运行它以查看是否可行。您还可以参考光束功能矩阵,以从Spark Runner角度查看支持的功能。

本文链接:https://www.f2er.com/2944116.html

大家都在问