-
第一阶段=读取文件。由于存在重新分区(因为它需要进行改组,因此无法进行广泛的转换),因此无法将其与partial_count(第二阶段)一起加入单阶段
-
第二阶段=本地计数(计算每个分区的计数)
-
第三阶段=在驱动程序上进行结果聚合。
火花产生的每个动作或广泛的转变是独立的阶段。要获得有关窄/宽转换以及为什么宽转换需要单独阶段的更多详细信息,请查看"Wide Versus Narrow Dependencies,High Performance Spark,Holden Karau"或this article。
让我们在本地测试此假设。首先,您需要创建一个数据集:
dataset / test-data.json
[
{ "key": 1,"value": "a" },{ "key": 2,"value": "b" },{ "key": 3,"value": "c" },{ "key": 4,"value": "d" },{ "key": 5,"value": "e" },{ "key": 6,"value": "f" },{ "key": 7,"value": "g" },{ "key": 8,"value": "h" }
]
比运行以下代码:
StructType schema = new StructType()
.add("key",DataTypes.IntegerType)
.add("value",DataTypes.StringType);
SparkSession session = SparkSession.builder()
.appName("sandbox")
.master("local[*]")
.getOrCreate();
session
.read()
.schema(schema)
.json("file:///C:/<you_path>/dataset")
.repartition(4) // comment on the second run
.registerTempTable("df");
session.sqlContext().sql("SELECT COUNT(*) FROM df").explain();
输出将是:
== Physical Plan ==
*(3) HashAggregate(keys=[],functions=[count(1)])
+- Exchange SinglePartition
+- *(2) HashAggregate(keys=[],functions=[partial_count(1)])
+- Exchange RoundRobinPartitioning(4)
+- *(1) FileScan json [] Batched: false,Format: JSON,Location: InMemoryFileIndex[file:/C:/Users/iaroslav/IdeaProjects/sparksandbox/src/main/resources/dataset],PartitionFilters: [],PushedFilters: [],ReadSchema: struct<>
但是,如果您注释/删除.repartition(4)字符串,请注意TableScan和partial_count是在单个阶段内完成的,输出将如下所示:
== Physical Plan ==
*(2) HashAggregate(keys=[],functions=[count(1)])
+- Exchange SinglePartition
+- *(1) HashAggregate(keys=[],functions=[partial_count(1)])
+- *(1) FileScan json [] Batched: false,ReadSchema: struct<>
P.S。请注意,额外的阶段可能会对性能产生重大影响,因为它需要磁盘I / O(请看here),并且某种同步障碍会影响并行化,这意味着在大多数情况下,Spark不会启动阶段2直到第一阶段完成。即使repartition
提高并行度也很值得。
本文链接:https://www.f2er.com/3151980.html