我在分区后查询出现性能问题。
我每天有一个大约 3000 万行和 20 列的镶木地板文件。例如,文件 data_20210721.parquet
看起来像:
+-----------+---------------------+---------------------+------------+-----+
| reference | date_from | date_to | daytime | ... |
+-----------+---------------------+---------------------+------------+-----+
| A | 2021-07-21 17:30:25 | 2021-07-22 02:21:57 | 2021-07-22 | ... |
| A | 2021-07-21 12:10:10 | 2021-07-21 13:00:00 | 2021-07-21 | ... |
| A | ... | ... | ... | ... |
+-----------+---------------------+---------------------+------------+-----+
我们有一个代码来处理它,让它只有一天,并减少一个午夜,这样我们就有:
+-----------+---------------------+---------------------+------------+-----+
| reference | date_from | date_to | daytime | ... |
+-----------+---------------------+---------------------+------------+-----+
| A | 2021-07-21 17:30:25 | 2021-07-22 00:00:00 | 2021-07-21 | ... | <- split at midnight + daytime update
| A | 2021-07-22 00:00:00 | 2021-07-22 02:21:57 | 2021-07-22 | ... | <- residual
| A | 2021-07-21 12:10:10 | 2021-07-21 13:00:00 | 2021-07-21 | ... |
| A | ... | ... | ... | ... |
+-----------+---------------------+---------------------+------------+-----+
行2,可以称为残差,因为它与文件不在同一天。
然后我们想每天生成 1 个镶木地板,因此默认的解决方案是处理每个文件并使用以下命令保存数据框:
df.write.partitionBy(["id","daytime"]).mode("append").parquet("hdfs/path")
模式设置为追加,因为第二天,我们可能会有过去/未来几天的残差。
还有其他级别的分区,例如:
- ID:它固定大约一年(保存这样的存储非常好;))
- 周数
- 国家
即使分区在行方面非常“平衡”,处理时间也变得非常缓慢。
例如,要计算给定日期集每天的行数:
- 原始 df(7s 秒):
spark.read.parquet("path/to/data_2021071[0-5].parquet")\
.groupBy("DayTime")\
.count()\
.show()
- 分区数据(几分钟)
spark.read.parquet("path/to/data")\
.filter( (col("DayTime") >= "2021-07-10") & (col("DayTime") <= "2021-07-15") )\
.groupBy("DayTime")\
.count()\
.show()
我们认为最后一级的小分区太多(因为追加,大约有 600 个非常小的文件,只有几个 Kb/Mb),所以我们尝试为每个分区合并它们,但没有改进。我们还尝试仅在 daytime
上进行分区(以防有多个级别的分区会产生问题)。
是否有任何解决方案来提高性能(或了解瓶颈在哪里)?
它可以与我们对 date
列进行分区的事实有关吗?我看到了很多按年/月/日分区的例子,例如 3 个整数但不符合我们的需要。
这个解决方案非常适合解决我们遇到的许多问题,但性能损失太重要而不能保持原样。欢迎任何建议:)
编辑 1:
问题来自于计划不同的事实:
spark.read.parquet("path/to/data/DayTime=2021-07-10")
和
spark.read.parquet("path/to/data/").filter(col("DayTime")=="2021-07-10")
这是一个小例子的计划,其中 DayTime
已转换为“长”,因为我认为可能是由于数据类型造成的缓慢:
spark.read.parquet("path/to/test/").filter(col("ts") == 20200103).explain(extended=True)
== Parsed Logical Plan ==
'Filter ('ts = 20200103)
+- AnalysisBarrier
+- Relation[date_from#4297,date_to#4298,....] parquet
== Analyzed Logical Plan ==
date_from: timestamp,date_to: timestamp,ts: int,....
Filter (ts#4308 = 20200103)
+- Relation[date_from#4297,ts#4308,....] parquet
== Optimized Logical Plan ==
Filter (isnotnull(ts#4308) && (ts#4308 = 20200103))
+- Relation[date_from#4297,....] parquet
== Physical Plan ==
*(1) FileScan parquet [date_from#4297,....] Batched: true,Format: Parquet,Location: InmemoryFileIndex[hdfs://.../test_perf],PartitionCount: 1,PartitionFilters: [isnotnull(ts#4308),(ts#4308 = 20200103)],PushedFilters: [],ReadSchema: struct<date_from:timestamp,date_to:timestamp,....
对比
spark.read.parquet("path/to/test/ts=20200103").explain(extended=True)
== Parsed Logical Plan ==
Relation[date_from#2086,date_to#2087,....] parquet
== Optimized Logical Plan ==
Relation[date_from#2086,....] parquet
== Physical Plan ==
*(1) FileScan parquet [date_from#2086,.....] Batched: true,Location: InmemoryFileIndex[hdfs://.../test_perf/ts=20200103],PartitionFilters: [],....
提前致谢,
尼古拉斯