在大型数据集上运行Pandas UDF时出现问题

我目前正在研究一个项目,我很难理解PySpark中的Pandas UDF是如何工作的。

我有一个Spark群集,其中一个主节点具有8个内核和64GB,以及两个16个内核和112GB的工作线程。我的数据集非常大,分为七个主要分区,每个分区约有7800万行。数据集包含70列。 我定义了一个Pandas UDF来对数据集执行一些操作,而这只能使用Python在Pandas数据帧上完成。

熊猫UDF是这样定义的:

@pandas_udf(schema,PandasUDFType.GROUPED_MAP)
def operation(pdf):
   #Some operations
   return pdf

spark.table("my_dataset").groupBy(partition_cols).apply(operation)

在进行操作之前,绝对没有办法让Pandas UDF崩溃。我怀疑某处存在OOM错误。上面的代码运行了几分钟,然后崩溃,并显示一条错误代码,指出该连接已重置。 但是,如果我在一个分区上过滤后调用.toPandas()函数,然后显示它,则它运行良好,没有错误。该错误似乎仅在使用PandasUDF时发生。

我不明白它是如何工作的。 Spark是否尝试一次转换一个整个分区(78M行)?如果是这样,它将使用什么内存?驱动程序内存?执行者的?如果在驱动程序上,是否在其上执行了所有Python代码?

集群配置如下:

  • SPARK_WORKER_CORES = 2
  • SPARK_WORKER_MEMORY = 64克
  • spark.executor.cores 2
  • spark.executor.memory 30克(以便为python实例提供内存)
  • spark.driver.memory 43克

我丢失了什么东西还是无法通过PandasUDF运行7800万行?

fankaoquan 回答:在大型数据集上运行Pandas UDF时出现问题

  

Spark是否尝试一次转换整个分区(7800万行)?

这正是发生的情况。 Spark 3.0增加了对分块UDF的支持,该分块UDF可在Pandas DataFramesSeries的迭代器上运行,但是如果对数据集进行操作,则只能在Pandas数据帧上使用Python进行操作,这些可能不是您的正确选择。

  

如果是,它将使用什么内存?驱动程序内存?遗嘱执行人的?

每个分区在各自的执行器上进行本地处理,并使用Arrow流将数据传递到Python worker和从Python worker传递数据。

  

我是否缺少某些东西,还是无法通过PandasUDF运行7800万行?

只要您有足够的内存来处理Arrow输入,输出(尤其是复制数据),辅助数据结构以及JVM开销,它就可以很好地处理大型数据集。

但是在如此小的集群上,最好使用熊猫对输出进行分区并直接读取数据,而不用使用Spark会更好。这样,您将能够使用所有可用资源(即> 100GB /解释器)进行数据处理,而不是将这些资源浪费在次要任务上(具有16GB的开销/解释器)。

本文链接:https://www.f2er.com/2856374.html

大家都在问