火花2.4.0
rdd = rdd.cache()
print(rdd.getStoragelevel())
内存序列化1x复制
sc.setCheckpointDir("/tmp/checkpoints")
rdd.checkpoint()
对rdd的操作
rdd.count()
25066
检查是否有检查点:
rdd.isCheckpointed()
错误
print(rdd.getcheckpointFile())
错误
我测试了spark 2.4.4(EMR 5.28)及其工作原理。
我可能是通过EMR的权限或配置问题,就像我之前在spark 2.4.3上尝试过的那样,在2.4.4发行说明中我没有看到检查点的任何问题。
df = spark.range(1,7,2)
df.show()
rdd = df.rdd
rdd = rdd.cache()
print("Storage Level - {}".format(rdd.getStorageLevel()))
print("Is Checkpointed - {}".format(rdd.isCheckpointed()))
print("Checkpoint File - {}".format(rdd.getCheckpointFile()))
# Setting HDFS directory
sc.setCheckpointDir("/tmp/checkpoint_dir/")
rdd.checkpoint()
print("Is Checkpointed - {}".format(rdd.isCheckpointed()))
print("Checkpoint File - {}".format(rdd.getCheckpointFile()))
# Calling an action
print("count - {}".format(rdd.count()))
print("Is Checkpointed - {}".format(rdd.isCheckpointed()))
print("Checkpoint File - {}".format(rdd.getCheckpointFile()))
输出:
+---+
| id|
+---+
| 1|
| 3|
| 5|
+---+
Storage Level - Memory Serialized 1x Replicated
Is Checkpointed - False
Checkpoint File - None
Is Checkpointed - False
Checkpoint File - None
count - 3
Is Checkpointed - True
Checkpoint File - hdfs://ip-xx-xx-xx-xx.ec2.internal:8020/tmp/checkpoint_dir/5d3bf642-cc17-4ffa-be10-51c58b8f5fcf/rdd-9
,
我使用Spark 2.4.2在独立集群中进行了测试。 Checkpoint也在那里工作。
spark.sparkContext.setCheckpointDir("temp/")
val textFile=spark.sparkContext.textFile("test1.txt")
println("textFile.isCheckpointed = " + textFile.isCheckpointed)
textFile.checkpoint()
println("textFile.count() = " + textFile.count())
println("textFile.isCheckpointed = " + textFile.isCheckpointed)
结果
textFile.isCheckpointed = false
textFile.count() = 8
textFile.isCheckpointed = true