Spark从HDFS读取文件时,它将为单个输入拆分创建单个分区。输入拆分由用于读取此文件的Hadoop InputFormat设置。例如,如果您使用textFile(),它将是Hadoop中的TextInputFormat,它将为您返回单个HDFS块的单个分区(但分区之间的拆分将在行拆分而不是确切的拆分中完成),除非您有一个压缩的文本文件。对于压缩文件,取决于压缩类型,分区数量会有所不同
下面提到的很少有其他参数通常适用于RDD而不适用于数据帧。
spark.default.parallelism - For distributed shuffle operations like reduceByKey and join,the largest number of partitions in a parent RDD. For operations like parallelize with no parent RDDs,it depends on the cluster manager:
Local mode: number of cores on the local machine
Mesos fine grained mode: 8
Others: total number of cores on all executor nodes or 2,whichever is larger
和
spark.sql.shuffle.partitions - controls the number of partitions for operations on DataFrames (default is 200)
一旦定义了分区数量,每个分区将由一个任务处理,并且每个任务都在执行程序实例上运行。使用动态分配,执行程序实例的数量由分区数量控制,分区数量可以在DAG执行的每个阶段改变。
如果要在打开动态分配时控制执行程序的数量,则可以在spark默认配置文件中设置以下配置。
spark.dynamicAllocation.initialExecutors | spark.dynamicAllocation.minExecutors | Initial number of executors to run if dynamic allocation is enabled.
spark.dynamicAllocation.maxExecutors | infinity | Upper bound for the number of executors if dynamic allocation is enabled.
spark.dynamicAllocation.minExecutors | 0 | Lower bound for the number of executors if dynamic allocation is enabled.
您应设置spark.dynamicAllocation.maxExecutors
来控制可在EMR群集中设置的执行程序的最大数量。
ExecutorAllocationManager
具有一种复杂的算法,可以确定产生的执行程序的数量:-
private def updateAndSyncNumExecutorsTarget(now: Long): Int = synchronized {
val maxNeeded = maxNumExecutorsNeeded
if (initializing) {
// Do not change our target while we are still initializing,// Otherwise the first job may have to ramp up unnecessarily
0
} else if (maxNeeded < numExecutorsTarget) {
// The target number exceeds the number we actually need,so stop adding new
// executors and inform the cluster manager to cancel the extra pending requests
val oldNumExecutorsTarget = numExecutorsTarget
numExecutorsTarget = math.max(maxNeeded,minNumExecutors)
numExecutorsToAdd = 1
// If the new target has not changed,avoid sending a message to the cluster manager
if (numExecutorsTarget < oldNumExecutorsTarget) {
client.requestTotalExecutors(numExecutorsTarget,localityAwareTasks,hostToLocalTaskCount)
logDebug(s"Lowering target number of executors to $numExecutorsTarget (previously " +
s"$oldNumExecutorsTarget) because not all requested executors are actually needed")
}
numExecutorsTarget - oldNumExecutorsTarget
} else if (addTime != NOT_SET && now >= addTime) {
val delta = addExecutors(maxNeeded)
logDebug(s"Starting timer to add more executors (to " +
s"expire in $sustainedSchedulerBacklogTimeoutS seconds)")
addTime += sustainedSchedulerBacklogTimeoutS * 1000
delta
} else {
0
}
}
在上面的代码片段中,高层流程如下:-
max_needed
=(运行总和+待处理任务)/每个执行者的任务
如果此值小于numExecutorsTarget
,则该值小于初始执行者和spark.executor.instances then the initial request for adding executors is determined based on
spark.dynamicAllocation.schedulerBacklogTimeout and every subsequent request is made on the duration defined by
spark.dynamicAllocation.sustainedSchedulerBacklogTimeout`和执行者的最大值以与指数补偿相反的2的倍数旋转。
最后,需要注意的重要一点是变量addTime
是在提交阶段时设置的,因此,您设置的任何时间不是从作业运行时开始,而是从提交阶段时开始。
有关EMR群集的默认配置,请参阅此处提供的文档-https://docs.aws.amazon.com/emr/latest/ReleaseGuide/emr-spark-configure.html
本文链接:https://www.f2er.com/3071601.html