我有一个数据集,我想使用多个Pyspark SQL Grouped Map UDF在AWS EMR中的临时集群上运行的较大ETL过程的不同阶段映射。分组地图API要求在应用之前先将Pyspark数据框分组,但是我实际上不需要分组密钥。
此刻,我正在使用任意分组,该分组有效,但导致:
-
不必要的洗牌。
-
每个作业中任意groupby的错误代码。
我的理想解决方案是应用矢量化的熊猫UDF,而无需进行任意分组,但是如果我可以保存任意分组,则至少可以消除混洗。
编辑:
这是我的代码的样子。我最初使用的是任意分组,但目前正在根据@pault的以下注释尝试spark_partition_id()
。
@pandas_udf(b_schema,PandasUDFType.GROUPED_MAP)
def transform(a_partition):
b = a_partition.drop("pid",axis=1)
# Some other transform stuff
return b
(sql
.read.parquet(a_path)
.withColumn("pid",spark_partition_id())
.groupBy("pid")
.apply(transform)
.write.parquet(b_path))
使用spark_partition_id()
似乎仍然会造成随机播放。我得到以下DAG:
阶段1
- 扫描实木复合地板
- 项目
- 项目
- 交换
阶段2
- 交换
- 排序
- flatMapGroupsInPandas