Spark:是否可以增加pyarrow缓冲区?

我试图像这样将大型(〜30GB)数据帧传递给pandas_udf:

@f.pandas_udf(gen_udf_schema(),f.PandasUDFType.GROUPED_MAP)
def _my_udf(df):
    # ... do df work ...
    return df

df = df.groupBy('some_col').apply(_my_udf)

我尝试增加执行程序的内存,驱动程序内存和驱动程序maxResultSize,但仍然在群集中收到下面详述的pyarrow内存错误。是否有等效的驱动程序maxResultSize,即executor maxResultSize我可以用来避免此错误?在线上似乎没有太多信息。

我无法拆分数据帧,因为它实际上是1个小数据帧和1个大数据帧(约29GB)的并集。在udf内部,我将两者分开并做我的工作,然后仅返回小数据框。

y4j.protocol.Py4JJavaError: An error occurred while calling o324.showString.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 44 in stage 19.0 failed 4 times,most recent failure: Lost task 44.3 in stage 19.0 (TID 368,ip-172-31-13-57.us-west-2.compute.internal,executor 3): org.apache.arrow.vector.util.OversizedAllocationException: Unable to expand the buffer
    at org.apache.arrow.vector.BaseVariableWidthVector.reallocBufferHelper(BaseVariableWidthVector.java:547)
    at org.apache.arrow.vector.BaseVariableWidthVector.reallocValidityAndOffsetBuffers(BaseVariableWidthVector.java:529)
    at org.apache.arrow.vector.BaseVariableWidthVector.handleSafe(BaseVariableWidthVector.java:1221)
    at org.apache.arrow.vector.BaseVariableWidthVector.fillEmpties(BaseVariableWidthVector.java:881)
    at org.apache.arrow.vector.BaseVariableWidthVector.setSafe(BaseVariableWidthVector.java:1062)
    at org.apache.spark.sql.execution.arrow.StringWriter.setvalue(ArrowWriter.scala:242)
    at org.apache.spark.sql.execution.arrow.ArrowFieldWriter.write(ArrowWriter.scala:121)
    at org.apache.spark.sql.execution.arrow.ArrowWriter.write(ArrowWriter.scala:86)
    at org.apache.spark.sql.execution.python.ArrowPythonRunner$$anon$2$$anonfun$writeIteratorToStream$1.apply$mcV$sp(ArrowPythonRunner.scala:85)
    at org.apache.spark.sql.execution.python.ArrowPythonRunner$$anon$2$$anonfun$writeIteratorToStream$1.apply(ArrowPythonRunner.scala:76)
    at org.apache.spark.sql.execution.python.ArrowPythonRunner$$anon$2$$anonfun$writeIteratorToStream$1.apply(ArrowPythonRunner.scala:76)
    at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
    at org.apache.spark.sql.execution.python.ArrowPythonRunner$$anon$2.writeIteratorToStream(ArrowPythonRunner.scala:96)
    at org.apache.spark.api.python.BasePythonRunner$WriterThread$$anonfun$run$1.apply(PythonRunner.scala:345)
    at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1945)
    at org.apache.spark.api.python.BasePythonRunner$WriterThread.run(PythonRunner.scala:194)
z298961 回答:Spark:是否可以增加pyarrow缓冲区?

暂时没有好的解决方案,如果你有好的解决方案,请发邮件至:iooj@foxmail.com
本文链接:https://www.f2er.com/3148374.html

大家都在问