我正在尝试使用一个累加器在多个任务之间共享一个值,但是当传递给map时,该累加器的值似乎会发生变化。
我正在使用pyspark并编写了以下非常基本的代码,以找出我遇到的问题。
sc = spark.sparkContext
rdd = sc.parallelize([0,1,0])
for i in range(5):
rdd.foreach(lambda x: ma.add(x))
m = ma.value
print("m>",m)
rdd = rdd.map(lambda x: x + m)
print(rdd.collect())
ma = sc.accumulator(0.0)
我得到的输出:
m> 2.0
[2.0,3.0,2.0,2.0]
m> 12.0
[24.0,25.0,24.0,24.0]
m> 122.0
[366.0,367.0,366.0,366.0]
m> 1832.0
[7328.0,7329.0,7328.0,7328.0]
m> 36642.0
[183210.0,183211.0,183210.0,183210.0]
我希望将看起来正确的值m添加到每个条目中,以便第二个输出为
[14.0,15.0,14.0,14.0]
相反,似乎在每个阶段我们都将m * i添加到原始数据[2*12,1 + 2*12,2*12,2*12]
等中。