Pyspark RDD的平均间隔

我正在尝试使用PySpark查找相邻元组列表之间的平均差。

例如,如果我有这样的RDD

vals = [(2,110),(2,130),120),(3,200),206),(4,150),160),170)]

我想找到每个键的平均差异。

例如,键值“ 2”

平均差异为(abs(110-130)+ abs(130-120))/ 2 = 15。

到目前为止,这是我的方法。我正在尝试更改平均计算代码以适应此要求。但这似乎不起作用。

from pyspark import SparkContext
aTuple = (0,0)
interval = vals.aggregateByKey(aTuple,lambda a,b: (abs(a[0] - b),a[1] + 1),b: (a[0] + b[0],a[1] + b[1]))
finalResult = interval.mapValues(lambda v: (v[0]/v[1])).collect()

我想使用RDD函数,不使用Spark SQL或任何其他附加程序包来完成此操作。

什么是最好的方法?

如有任何疑问,请告诉我。

谢谢您的时间。

wuhaizhonggd 回答:Pyspark RDD的平均间隔

我想出了一个幼稚的方法。我不确定这是否在所有情况下都适用。像这样。

首先让我们提供一个函数来计算移动平均值。如果这不是计算移动平均线的正确方法,请纠正我。

def get_abs(num_list):
    '''
    >>> get_abs([110,130,120])
    15.0
    '''
    acc = 0
    num_pairs = 0
    for i in range(len(num_list)-1):
        acc += abs(num_list[i]-num_list[i+1])
        num_pairs +=1
    return acc/num_pairs

接下来,我们将列表并行化

>>> vals = [(2,110),(2,130),120),(3,200),206),(4,150),160),170)]
>>> rdd = sc.parallelize(vals)
>>> rdd.collect()
[(2,170)]

然后,将属于同一列表的值分组。

>>> vals = rdd.groupByKey().mapValues(list)
>>> vals.collect()
[(4,[150,160,170]),[110,120]),[200,206,206])]

然后,我们只需要调用上面定义的函数即可计算分组值的移动平均值。

>>> vals.mapValues(get_abs).collect()
[(4,10.0),15.0),3.0)]
本文链接:https://www.f2er.com/3131169.html

大家都在问