我有一个带有嵌套 for 循环的 spark 作业。我使用嵌套 for 循环的原因是我有一个非常大的数据集,它被分区为 modules
并且我不想将所有分区加载到内存中来完成我的工作。我有 8
模块,伪代码是这样的:
inpath="s3://path"
modules=fs.ls(inpath)
for path in modules:
modulepath='s3://' +path+ '/'
module=path.split('=')[1]
df=spark.read.option("basePath",inpath).parquet(modulepath)
df=df.persist(Storagelevel.MEMORY_AND_DISK)
inter_df=[]
iter=[1,2,3]
for i in iter:
#perform calculations that involve slicing and dicing df for 3 iterations
df2=slice_n_dice(df)
df2=df2.persist(Storagelevel.MEMORY_AND_DISK)
df3=cut_n_whip(df)
df3=df3.persist(Storagelevel.MEMORY_AND_DISK)
df4=mix_n_mash(df2,df3)
inter_df.append(df4)
final_df=reduce(DataFrame.unionByName,inter_df)
fina_df=final_df.persist(Storagelevel.MEMORY_AND_DISK)
final_df=fina_df.withColumn() #Adding a bunch of calculated columns to final_df
final_df.write.parquet("s3//filename_" +module +".parquet")
final_df.unpersist()
df3.unpersist()
df2.unpersist()
我有很多 persists
的原因是我的数据很大,在分析 DAG 时,我发现它重新计算了很多并最终丢失了节点。通过使用 persist()
,我能够让它发挥作用。
然而,当我运行作业并查看 CPU 负载和内存时,即使我使用了 unpersist()
从上述 Ganglia 的 CPU 负载可以看出,8 个循环按预期进行。但是,在内存图中,我没有看到在每个外循环之后发生任何非持久化。缓存只是不断增加,直到它似乎完成了工作。
我是否理解错误或以错误的方式使用 unpersist
?
任何指针将不胜感激!
谢谢