Apache Spark:200个Reducer任务如何聚合20000+个映射器输出?

更新的问题

我不清楚的地方=> 在 ShuffleMapStage 中,每个映射器都会创建一个 .data 和一个 .index 文件

这些数据/索引将具有名称,如

shuflle_X_Y_Z

其中

X = shuffle_id

Y = map_id

Z = REDUCER_ID

我了解map_id的范围可以是1-222394

但是如何减少REDUCER_ID

是1-200(例如ResultStage的默认分区)吗?

是=执行者数量吗?

如果它是1-200,那么这200个任务如何知道要读取哪个数据/索引文件?

帮助我理解这一点

我不了解减少/整合的工作原理吗? 说我有一个简单的例子,例如

input_df = spark.read.parquet("Big_folder_having parquets")

# Spark loads and during reading partitions = as per number of files * number of 128MB blocks.

# Now I do a Simple Aggergation/Count 

input_df.createOrReplaceTempView("table1")

grouped_df = spark.sql("select key1,key2,count(1) as user_count from table1 group by 1,2")

# And simply write it with default 200 parallelism

grouped_df.write.format("parquet").mode("overwrite").save(my_save_path)

因此对于输入负载父级rdd /输入映射舞台具有 22394 个分区

据我了解,每个映射器都会创建一个 shuflle数据和索引文件

现在下一个阶段仅具有 200 个任务(默认随机播放分区

这200个化简器/任务如何处理22394个映射器任务的输出

附加了 DAG屏幕截图

Apache Spark:200个Reducer任务如何聚合20000+个映射器输出?

pcluckbird 回答:Apache Spark:200个Reducer任务如何聚合20000+个映射器输出?

您有一个包含40个核心的集群。

会发生什么:

您要求Spark读取目录中的文件,它将一次执行40个任务(因为这是您获得的内核数),结果将是具有22,394个分区的RDD。 (请注意随机溢出。请检查阶段详细信息。)

然后,您要求Spark通过一些键对数据进行分组,然后将其写出。

由于默认的随机分区为200,Spark会将数据从22,394分区“移动”到200个分区中,并一次处理40个任务/分区。

换句话说...

当您请求分组并保存Spark时,将创建计划(我建议您调查物理和逻辑计划),并显示以下内容:“为了执行用户要求的操作,我将创建200个任务将针对数据执行”

然后,执行者将一次执行40个任务。

本身没有映射器或缩减器。

Spark将创建一些任务,并且执行者将执行这些任务。

编辑:

忘记了,RDD中的分区数量将决定输出文件的数量。

,

如果您有10个装有10个苹果的水桶或1个装有100个苹果的水桶,则苹果总数相同。

询问它如何处理类似于询问如何携带10个桶或携带1个桶。

它会做还是不做,取决于您拥有的数据量。您可能遇到的问题是数据溢出到磁盘上,因为当拥有200个分区时,每个分区都需要处理可能不一定适合内存的更多数据。

本文链接:https://www.f2er.com/3148751.html

大家都在问