我试图像这样将大型(〜30GB)数据帧传递给pandas_udf:
@f.pandas_udf(gen_udf_schema(),f.PandasUDFType.GROUPED_MAP)
def _my_udf(df):
# ... do df work ...
return df
df = df.groupBy('some_col').apply(_my_udf)
我尝试增加执行程序的内存,驱动程序内存和驱动程序maxResultSize,但仍然在群集中收到下面详述的pyarrow内存错误。是否有等效的驱动程序maxResultSize,即executor maxResultSize
我可以用来避免此错误?在线上似乎没有太多信息。
我无法拆分数据帧,因为它实际上是1个小数据帧和1个大数据帧(约29GB)的并集。在udf内部,我将两者分开并做我的工作,然后仅返回小数据框。
y4j.protocol.Py4JJavaError: An error occurred while calling o324.showString.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 44 in stage 19.0 failed 4 times,most recent failure: Lost task 44.3 in stage 19.0 (TID 368,ip-172-31-13-57.us-west-2.compute.internal,executor 3): org.apache.arrow.vector.util.OversizedAllocationException: Unable to expand the buffer
at org.apache.arrow.vector.BaseVariableWidthVector.reallocBufferHelper(BaseVariableWidthVector.java:547)
at org.apache.arrow.vector.BaseVariableWidthVector.reallocValidityAndOffsetBuffers(BaseVariableWidthVector.java:529)
at org.apache.arrow.vector.BaseVariableWidthVector.handleSafe(BaseVariableWidthVector.java:1221)
at org.apache.arrow.vector.BaseVariableWidthVector.fillEmpties(BaseVariableWidthVector.java:881)
at org.apache.arrow.vector.BaseVariableWidthVector.setSafe(BaseVariableWidthVector.java:1062)
at org.apache.spark.sql.execution.arrow.StringWriter.setvalue(ArrowWriter.scala:242)
at org.apache.spark.sql.execution.arrow.ArrowFieldWriter.write(ArrowWriter.scala:121)
at org.apache.spark.sql.execution.arrow.ArrowWriter.write(ArrowWriter.scala:86)
at org.apache.spark.sql.execution.python.ArrowPythonRunner$$anon$2$$anonfun$writeIteratorToStream$1.apply$mcV$sp(ArrowPythonRunner.scala:85)
at org.apache.spark.sql.execution.python.ArrowPythonRunner$$anon$2$$anonfun$writeIteratorToStream$1.apply(ArrowPythonRunner.scala:76)
at org.apache.spark.sql.execution.python.ArrowPythonRunner$$anon$2$$anonfun$writeIteratorToStream$1.apply(ArrowPythonRunner.scala:76)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
at org.apache.spark.sql.execution.python.ArrowPythonRunner$$anon$2.writeIteratorToStream(ArrowPythonRunner.scala:96)
at org.apache.spark.api.python.BasePythonRunner$WriterThread$$anonfun$run$1.apply(PythonRunner.scala:345)
at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1945)
at org.apache.spark.api.python.BasePythonRunner$WriterThread.run(PythonRunner.scala:194)