为什么即使分区较小,具有 Chained withColumn 窗口聚合的 Spark Stage 也会继续运行 OOM?

我在 spark Job 中有一个 Stage,它包含一长串窗口聚合,无论我添加多少个分区,这些聚合都会一直失败。

我的集群配置是一个 ```48 Node(r5.2xlarge) EMR 集群,每个集群有:

  • 64 GB 内存
  • 每个 8vCpu

spark 2.4 设置为:

  • 执行器内存 16g,开销 2g
  • 驱动内存 8g,开销 2g
  • 自动广播 -1

执行计划如下:

enter image description here

任务的核心是使用 withColumn 聚合在 2 种类型的分区上添加新列。我已经通过 4 个公共分区键中的 2 个对数据集进行了重新分区;这避免了任何洗牌。

w = W.partitionBy("a","b","c").orderBy("d")
w_2 = W.partitionBy("a","c").orderBy("d")

calculated_outcome = data.withColumn("x",F.when(
                F.col("data_column").isnotNull(),F.sum("sum_columns").over(w.rangeBetween(-90,0)),).otherwise(None)).withColumn("x_2",F.sum("sum_columns").over(w.rangeBetween(-60,).otherwise(None)).withColumn("x_3",F.sum("sum_columns").over(w.rangeBetween(-30,).otherwise(None)).withColumn("y",F.when(
                F.col("data_column_2").isnotNull(),).otherwise(None)).withColumn("y_2",).otherwise(None)).withColumn("y_3",).otherwise(None)).withColumn("z",).otherwise(None)).withColumn("z_2",).otherwise(None)).withColumn("z_3",).otherwise(None))

return calculated_outcome.groupBy("a","c").agg(*F.sum([x for x in [columns_to_sum_avg_etc]))

在上面的例子中,我已经给出了转换的主要内容(sum,avg,lag)。窗口函数中总共有11个withColumns,下面groupie中有大约15个聚合列。

如果需要,很乐意添加更多信息!

hajunpeng 回答:为什么即使分区较小,具有 Chained withColumn 窗口聚合的 Spark Stage 也会继续运行 OOM?

暂时没有好的解决方案,如果你有好的解决方案,请发邮件至:iooj@foxmail.com
本文链接:https://www.f2er.com/3261.html

大家都在问