为什么火花计数动作分三个阶段执行

我已经加载了一个csv文件。将其重新分区为4,然后对DataFrame进行计数。当我查看DAG时,我看到此操作分3个阶段执行。

为什么火花计数动作分三个阶段执行

为什么将这个简单的动作分为3个阶段执行。我想第一阶段是加载文件,第二阶段是找到每个分区的计数。

那么第三阶段会发生什么?

这是我的代码

val sample = spark.read.format("csv").option("header","true").option("inferSchema","true").option("delimiter",";").load("sample_data.csv")

sample.repartition(4).count()
fythlhan 回答:为什么火花计数动作分三个阶段执行

  1. 第一阶段=读取文件。由于存在重新分区(因为它需要进行改组,因此无法进行广泛的转换),因此无法将其与partial_count(第二阶段)一起加入单阶段

  2. 第二阶段=本地计数(计算每个分区的计数)

  3. 第三阶段=在驱动程序上进行结果聚合。

火花产生的每个动作或广泛的转变是独立的阶段。要获得有关窄/宽转换以及为什么宽转换需要单独阶段的更多详细信息,请查看"Wide Versus Narrow Dependencies,High Performance Spark,Holden Karau"this article

让我们在本地测试此假设。首先,您需要创建一个数据集:

dataset / test-data.json

[
  { "key":  1,"value":  "a" },{ "key":  2,"value":  "b" },{ "key":  3,"value":  "c" },{ "key":  4,"value":  "d" },{ "key":  5,"value":  "e" },{ "key":  6,"value":  "f" },{ "key":  7,"value":  "g" },{ "key":  8,"value":  "h" }
]

比运行以下代码:

    StructType schema = new StructType()
            .add("key",DataTypes.IntegerType)
            .add("value",DataTypes.StringType);

    SparkSession session = SparkSession.builder()
            .appName("sandbox")
            .master("local[*]")
            .getOrCreate();

    session
            .read()
            .schema(schema)
            .json("file:///C:/<you_path>/dataset")
            .repartition(4) // comment on the second run
            .registerTempTable("df");

    session.sqlContext().sql("SELECT COUNT(*) FROM df").explain();

输出将是:

== Physical Plan ==
*(3) HashAggregate(keys=[],functions=[count(1)])
+- Exchange SinglePartition
   +- *(2) HashAggregate(keys=[],functions=[partial_count(1)])
      +- Exchange RoundRobinPartitioning(4)
         +- *(1) FileScan json [] Batched: false,Format: JSON,Location: InMemoryFileIndex[file:/C:/Users/iaroslav/IdeaProjects/sparksandbox/src/main/resources/dataset],PartitionFilters: [],PushedFilters: [],ReadSchema: struct<>

但是,如果您注释/删除.repartition(4)字符串,请注意TableScan和partial_count是在单个阶段内完成的,输出将如下所示:

== Physical Plan ==
*(2) HashAggregate(keys=[],functions=[count(1)])
+- Exchange SinglePartition
   +- *(1) HashAggregate(keys=[],functions=[partial_count(1)])
      +- *(1) FileScan json [] Batched: false,ReadSchema: struct<>

P.S。请注意,额外的阶段可能会对性能产生重大影响,因为它需要磁盘I / O(请看here),并且某种同步障碍会影响并行化,这意味着在大多数情况下,Spark不会启动阶段2直到第一阶段完成。即使repartition提高并行度也很值得。

本文链接:https://www.f2er.com/3151980.html

大家都在问