当spark动态分配为true时,EMR Cluster显示太多执行程序

我正在EMR 5.27.0中以集群模式运行spark作业。 EMR的动态火花分配属性设置为true。

现在,当我开始执行Spark作业甚至启动Spark Shell时,我可以看到许多在Spark UI中启动的执行程序。

即使我仅使用spark-shell,为什么会发生这种情况?

我尝试了多种操作,例如设置属性,例如spark.dynamicAllocation.initialExecutors = 1,但没有成功。

理想情况下,动态分配应首先分配initialExecutor,然后在spark.dynamicAllocation.schedulerBacklogTimeout且满足等待/待处理任务属性后启动更多任务。但是它最初会立即启动所有执行程序。

spark.dynamicAllocation.executorAllocationRatio这个属性的作用是我尝试通过使用此属性来控制执行程序,但没有用。

当我使用minExecutor = 1启动EMR Shell时,我将获得以下日志 信息:实用程序:spark.executor.instances小于spark.dynamicAllocation.minExecutors无效,请忽略其设置,请更新您的配置。

在日志中显示的spark.executor.instances = 50。

我验证了spark-default,但没有此类属性。

请帮助我了解这种行为。

ccczzzxxx 回答:当spark动态分配为true时,EMR Cluster显示太多执行程序

Spark从HDFS读取文件时,它将为单个输入拆分创建单个分区。输入拆分由用于读取此文件的Hadoop InputFormat设置。例如,如果您使用textFile(),它将是Hadoop中的TextInputFormat,它将为您返回单个HDFS块的单个分区(但分区之间的拆分将在行拆分而不是确切的拆分中完成),除非您有一个压缩的文本文件。对于压缩文件,取决于压缩类型,分区数量会有所不同

下面提到的很少有其他参数通常适用于RDD而不适用于数据帧。

spark.default.parallelism - For distributed shuffle operations like reduceByKey and join,the largest number of partitions in a parent RDD. For operations like parallelize with no parent RDDs,it depends on the cluster manager:
Local mode: number of cores on the local machine
Mesos fine grained mode: 8
Others: total number of cores on all executor nodes or 2,whichever is larger

spark.sql.shuffle.partitions - controls the number of partitions for operations on DataFrames (default is 200)

一旦定义了分区数量,每个分区将由一个任务处理,并且每个任务都在执行程序实例上运行。使用动态分配,执行程序实例的数量由分区数量控制,分区数量可以在DAG执行的每个阶段改变。

如果要在打开动态分配时控制执行程序的数量,则可以在spark默认配置文件中设置以下配置。

spark.dynamicAllocation.initialExecutors | spark.dynamicAllocation.minExecutors |   Initial number of executors to run if dynamic allocation is enabled.
spark.dynamicAllocation.maxExecutors     | infinity                              | Upper bound for the number of executors if dynamic allocation is enabled.
spark.dynamicAllocation.minExecutors     | 0                                     | Lower bound for the number of executors if dynamic allocation is enabled.

您应设置spark.dynamicAllocation.maxExecutors来控制可在EMR群集中设置的执行程序的最大数量。

ExecutorAllocationManager具有一种复杂的算法,可以确定产生的执行程序的数量:-

private def updateAndSyncNumExecutorsTarget(now: Long): Int = synchronized {
    val maxNeeded = maxNumExecutorsNeeded

    if (initializing) {
      // Do not change our target while we are still initializing,// Otherwise the first job may have to ramp up unnecessarily
      0
    } else if (maxNeeded < numExecutorsTarget) {
      // The target number exceeds the number we actually need,so stop adding new
      // executors and inform the cluster manager to cancel the extra pending requests
      val oldNumExecutorsTarget = numExecutorsTarget
      numExecutorsTarget = math.max(maxNeeded,minNumExecutors)
      numExecutorsToAdd = 1

      // If the new target has not changed,avoid sending a message to the cluster manager
      if (numExecutorsTarget < oldNumExecutorsTarget) {
        client.requestTotalExecutors(numExecutorsTarget,localityAwareTasks,hostToLocalTaskCount)
        logDebug(s"Lowering target number of executors to $numExecutorsTarget (previously " +
          s"$oldNumExecutorsTarget) because not all requested executors are actually needed")
      }
      numExecutorsTarget - oldNumExecutorsTarget
    } else if (addTime != NOT_SET && now >= addTime) {
      val delta = addExecutors(maxNeeded)
      logDebug(s"Starting timer to add more executors (to " +
        s"expire in $sustainedSchedulerBacklogTimeoutS seconds)")
      addTime += sustainedSchedulerBacklogTimeoutS * 1000
      delta
    } else {
      0
    }
  }

在上面的代码片段中,高层流程如下:-

max_needed =(运行总和+待处理任务)/每个执行者的任务

如果此值小于numExecutorsTarget,则该值小于初始执行者和spark.executor.instances then the initial request for adding executors is determined based on spark.dynamicAllocation.schedulerBacklogTimeout and every subsequent request is made on the duration defined by spark.dynamicAllocation.sustainedSchedulerBacklogTimeout`和执行者的最大值以与指数补偿相反的2的倍数旋转。

最后,需要注意的重要一点是变量addTime是在提交阶段时设置的,因此,您设置的任何时间不是从作业运行时开始,而是从提交阶段时开始。

有关EMR群集的默认配置,请参阅此处提供的文档-https://docs.aws.amazon.com/emr/latest/ReleaseGuide/emr-spark-configure.html

本文链接:https://www.f2er.com/3071601.html

大家都在问