我目前正在研究一个项目,我很难理解PySpark中的Pandas UDF是如何工作的。
我有一个Spark群集,其中一个主节点具有8个内核和64GB,以及两个16个内核和112GB的工作线程。我的数据集非常大,分为七个主要分区,每个分区约有7800万行。数据集包含70列。 我定义了一个Pandas UDF来对数据集执行一些操作,而这只能使用Python在Pandas数据帧上完成。
熊猫UDF是这样定义的:
@pandas_udf(schema,PandasUDFType.GROUPED_MAP)
def operation(pdf):
#Some operations
return pdf
spark.table("my_dataset").groupBy(partition_cols).apply(operation)
在进行操作之前,绝对没有办法让Pandas UDF崩溃。我怀疑某处存在OOM错误。上面的代码运行了几分钟,然后崩溃,并显示一条错误代码,指出该连接已重置。 但是,如果我在一个分区上过滤后调用.toPandas()函数,然后显示它,则它运行良好,没有错误。该错误似乎仅在使用PandasUDF时发生。
我不明白它是如何工作的。 Spark是否尝试一次转换一个整个分区(78M行)?如果是这样,它将使用什么内存?驱动程序内存?执行者的?如果在驱动程序上,是否在其上执行了所有Python代码?
集群配置如下:
- SPARK_WORKER_CORES = 2
- SPARK_WORKER_MEMORY = 64克
- spark.executor.cores 2
- spark.executor.memory 30克(以便为python实例提供内存)
- spark.driver.memory 43克
我丢失了什么东西还是无法通过PandasUDF运行7800万行?