为什么即使使用 unpersist 也不会触发卸载内存

我有一个带有嵌套 for 循环的 spark 作业。我使用嵌套 for 循环的原因是我有一个非常大的数据集,它被分区为 modules 并且我不想将所有分区加载到内存中来完成我的工作。我有 8 模块,伪代码是这样的:

inpath="s3://path"
modules=fs.ls(inpath)

for path in modules:
    modulepath='s3://' +path+ '/'
    module=path.split('=')[1]
    df=spark.read.option("basePath",inpath).parquet(modulepath)
    df=df.persist(Storagelevel.MEMORY_AND_DISK)
    inter_df=[]

    iter=[1,2,3]    
    for i in iter:
        #perform calculations that involve slicing and dicing df for 3 iterations
        df2=slice_n_dice(df)
        df2=df2.persist(Storagelevel.MEMORY_AND_DISK)

        df3=cut_n_whip(df)
        df3=df3.persist(Storagelevel.MEMORY_AND_DISK)

        df4=mix_n_mash(df2,df3)
        inter_df.append(df4)

    final_df=reduce(DataFrame.unionByName,inter_df)
    fina_df=final_df.persist(Storagelevel.MEMORY_AND_DISK)
    final_df=fina_df.withColumn() #Adding a bunch of calculated columns to final_df

    final_df.write.parquet("s3//filename_" +module +".parquet")

    final_df.unpersist()
    df3.unpersist()
    df2.unpersist()

我有很多 persists 的原因是我的数据很大,在分析 DAG 时,我发现它重新计算了很多并最终丢失了节点。通过使用 persist(),我能够让它发挥作用。

然而,当我运行作业并查看 CPU 负载和内存时,即使我使用了 unpersist()

,我也没有看到每次外循环后内存被清除

为什么即使使用 unpersist 也不会触发卸载内存

为什么即使使用 unpersist 也不会触发卸载内存

从上述 Ganglia 的 CPU 负载可以看出,8 个循环按预期进行。但是,在内存图中,我没有看到在每个外循环之后发生任何非持久化。缓存只是不断增加,直到它似乎完成了工作。

我是否理解错误或以错误的方式使用 unpersist? 任何指针将不胜感激!

谢谢

yhl19870621 回答:为什么即使使用 unpersist 也不会触发卸载内存

你可以参考这篇文章How to make sure my DataFrame frees its memory?

和火花文档http://spark.apache.org/docs/latest/rdd-programming-guide.html#removing-data

enter image description here

我的理解是,unpersist 仅标记要删除的数据帧,但除非您使用 (blocking=true) 并等待删除完成,否则它不会立即从内存中删除。

本文链接:https://www.f2er.com/238.html

大家都在问