pyspark - 分区数据的计算(使用“追加”模式创建)很慢

我在分区后查询出现性能问题。

我每天有一个大约 3000 万行和 20 列的镶木地板文件。例如,文件 data_20210721.parquet 看起来像:

+-----------+---------------------+---------------------+------------+-----+
| reference |      date_from      |       date_to       |  daytime   | ... |
+-----------+---------------------+---------------------+------------+-----+
| A         | 2021-07-21 17:30:25 | 2021-07-22 02:21:57 | 2021-07-22 | ... |
| A         | 2021-07-21 12:10:10 | 2021-07-21 13:00:00 | 2021-07-21 | ... |
| A         | ...                 | ...                 | ...        | ... |
+-----------+---------------------+---------------------+------------+-----+

我们有一个代码来处理它,让它只有一天,并减少一个午夜,这样我们就有:

+-----------+---------------------+---------------------+------------+-----+
| reference |      date_from      |       date_to       |  daytime   | ... |
+-----------+---------------------+---------------------+------------+-----+
| A         | 2021-07-21 17:30:25 | 2021-07-22 00:00:00 | 2021-07-21 | ... | <- split at midnight + daytime update
| A         | 2021-07-22 00:00:00 | 2021-07-22 02:21:57 | 2021-07-22 | ... | <- residual
| A         | 2021-07-21 12:10:10 | 2021-07-21 13:00:00 | 2021-07-21 | ... |
| A         | ...                 | ...                 | ...        | ... |
+-----------+---------------------+---------------------+------------+-----+

2,可以称为残差,因为它与文件不在同一天。

然后我们想每天生成 1 个镶木地板,因此默认的解决方案是处理每个文件并使用以下命令保存数据框:

df.write.partitionBy(["id","daytime"]).mode("append").parquet("hdfs/path")

模式设置为追加,因为第二天,我们可能会有过去/未来几天的残差。

还有其他级别的分区,例如:

  • ID:它固定大约一年(保存这样的存储非常好;))
  • 周数
  • 国家

即使分区在行方面非常“平衡”,处理时间也变得非常缓慢。

例如,要计算给定日期集每天的行数:

  • 原始 df(7s 秒):
spark.read.parquet("path/to/data_2021071[0-5].parquet")\
.groupBy("DayTime")\
.count()\
.show()
  • 分区数据(几分钟)
spark.read.parquet("path/to/data")\
.filter( (col("DayTime") >= "2021-07-10") & (col("DayTime") <= "2021-07-15") )\
.groupBy("DayTime")\
.count()\
.show()

我们认为最后一级的小分区太多(因为追加,大约有 600 个非常小的文件,只有几个 Kb/Mb),所以我们尝试为每个分区合并它们,但没有改进。我们还尝试仅在 daytime 上进行分区(以防有多个级别的分区会产生问题)。

是否有任何解决方案来提高性能(或了解瓶颈在哪里)? 它可以与我们对 date 列进行分区的事实有关吗?我看到了很多按年/月/日分区的例子,例如 3 个整数但不符合我们的需要。

这个解决方案非常适合解决我们遇到的许多问题,但性能损失太重要而不能保持原样。欢迎任何建议:)

编辑 1:

问题来自于计划不同的事实:

spark.read.parquet("path/to/data/DayTime=2021-07-10")

spark.read.parquet("path/to/data/").filter(col("DayTime")=="2021-07-10")

这是一个小例子的计划,其中 DayTime 已转换为“长”,因为我认为可能是由于数据类型造成的缓慢:

spark.read.parquet("path/to/test/").filter(col("ts") == 20200103).explain(extended=True)

== Parsed Logical Plan ==
'Filter ('ts = 20200103)
+- AnalysisBarrier
      +- Relation[date_from#4297,date_to#4298,....] parquet

== Analyzed Logical Plan ==
date_from: timestamp,date_to: timestamp,ts: int,....
Filter (ts#4308 = 20200103)
+- Relation[date_from#4297,ts#4308,....] parquet

== Optimized Logical Plan ==
Filter (isnotnull(ts#4308) && (ts#4308 = 20200103))
+- Relation[date_from#4297,....] parquet

== Physical Plan ==
*(1) FileScan parquet [date_from#4297,....] Batched: true,Format: Parquet,Location: InmemoryFileIndex[hdfs://.../test_perf],PartitionCount: 1,PartitionFilters: [isnotnull(ts#4308),(ts#4308 = 20200103)],PushedFilters: [],ReadSchema: struct<date_from:timestamp,date_to:timestamp,....

对比

spark.read.parquet("path/to/test/ts=20200103").explain(extended=True)

== Parsed Logical Plan ==
Relation[date_from#2086,date_to#2087,....] parquet

== Optimized Logical Plan ==
Relation[date_from#2086,....] parquet

== Physical Plan ==
*(1) FileScan parquet [date_from#2086,.....] Batched: true,Location: InmemoryFileIndex[hdfs://.../test_perf/ts=20200103],PartitionFilters: [],....

提前致谢,

尼古拉斯

hillingzhou 回答:pyspark - 分区数据的计算(使用“追加”模式创建)很慢

您必须确保您的 filter 实际上是在使用分区结构,在磁盘级别进行修剪,而不是将所有数据放入内存然后应用过滤器。

尝试检查您的身体计划

spark.read.parquet("path/to/data")\
.filter( (col("DayTime") >= "2021-07-10") & (col("DayTime") <= "2021-07-15") )
.explain()

它应该有一个类似于 PartitionFilters: [isnotnull(DayTime#123),(DayTime#76 = your condition)],

的阶段

我的猜测是在您的情况下,它没有使用此 PartitionFilters 并且会扫描整个数据。

我建议尝试使用小数据集试验您的语法/重新分区策略,直到达到 PartitionFilters

本文链接:https://www.f2er.com/8255.html

大家都在问