没有GroupBy的Pyspark SQL Pandas分组地图? 阶段1 阶段2

我有一个数据集,我想使用多个Pyspark SQL Grouped Map UDF在AWS EMR中的临时集群上运行的较大ETL过程的不同阶段映射。分组地图API要求在应用之前先将Pyspark数据框分组,但是我实际上不需要分组密钥。

此刻,我正在使用任意分组,该分组有效,但导致:

  1. 不必要的洗牌。

  2. 每个作业中任意groupby的错误代码。

我的理想解决方案是应用矢量化的熊猫UDF,而无需进行任意分组,但是如果我可以保存任意分组,则至少可以消除混洗。

编辑

这是我的代码的样子。我最初使用的是任意分组,但目前正在根据@pault的以下注释尝试spark_partition_id()


@pandas_udf(b_schema,PandasUDFType.GROUPED_MAP)
def transform(a_partition):
  b = a_partition.drop("pid",axis=1)
  # Some other transform stuff
  return b

(sql
  .read.parquet(a_path)
  .withColumn("pid",spark_partition_id())
  .groupBy("pid")
  .apply(transform)
  .write.parquet(b_path))

使用spark_partition_id()似乎仍然会造成随机播放。我得到以下DAG:

阶段1

  1. 扫描实木复合地板
  2. 项目
  3. 项目
  4. 交换

阶段2

  1. 交换
  2. 排序
  3. flatMapGroupsInPandas
yc857622872 回答:没有GroupBy的Pyspark SQL Pandas分组地图? 阶段1 阶段2

要支持大致等效的逻辑(功能(pandas.core.frame.DataFrame) -> pandas.core.frame.DataFrame),您必须切换到Spark 3.0.0并使用MAP_ITER转换。

在最新的预览版本(3.0.0-preview2)中,您将需要UDF:

@pandas_udf(b_schema,PandasUDFType.MAP_ITER)
def transform(dfs):
    for df in dfs:
        b = df.drop("pid",axis=1)
        ...
        yield b

df.mapInPandas(transform)

,并在即将发布的3.0.0版本(SPARK-28264)中只是一个简单的功能:

def transform(dfs):
    for df in dfs:
        b = df.drop("pid",axis=1)
        # Some other transform stuff
        ...
        yield b

df.mapInPandas(transform,b_schema)

在2.x上可能的解决方法是使用普通的SCALAR UDF,将结果的每一行序列化为JSON,然后在另一侧反序列化,即

import json
from pyspark.sql.functions import from_json

@pandas_udf("string",PandasUDFType.SCALAR)
def transform(col1,col2):
    b = pd.DataFrame({"x": col1,"y": col2})
    ...
    return b.apply(lambda x: json.dumps(dict(zip(df.columns,x))),axis=1)


(df
    .withColumn("json_result",transform("col1","col2"))
    .withColumn("a_struct",from_json("json_result",b_schema)))
本文链接:https://www.f2er.com/3150180.html

大家都在问