scala – Spark – 如何将rdd的前N个作为新的rdd(不收集驱动程序)

前端之家收集整理的这篇文章主要介绍了scala – Spark – 如何将rdd的前N个作为新的rdd(不收集驱动程序)前端之家小编觉得挺不错的,现在分享给大家,也给大家做个参考。
我想知道如何过滤具有前N个值之一的RDD.通常我会对RDD进行排序并将前N个项目作为驱动程序中的数组来查找可以广播的第N个值来过滤rdd,如下所示:

val topNvalues = sc.broadcast(rdd.map(_.fieldToThreshold).distict.sorted.take(N))
val threshold = topNvalues.last
val rddWithTopNValues = rdd.filter(_.fieldToThreshold >= threshold)

但是在这种情况下我的N太大了,所以我怎么能用RDD这样做呢?:

def getExpensiveItems(itemPrices: RDD[(Int,Float)],count: Int): RDD[(Int,Float)] = {
     val sortedPrices = itemPrices.sortBy(-_._2).map(_._1).distinct

     // How to do this without collecting results to driver??
     val highPrices = itemPrices.getTopNValuesWithoutCollect(count)

     itemPrices.join(highPrices.keyBy(x => x)).map(_._2._1)
}

解决方法

在已排序的rdd上使用zipWithIndex,然后按索引过滤最多n个项目.为了说明这种情况,请考虑将此rrd按降序排序,

val rdd = sc.parallelize((1 to 10).map( _ => math.random)).sortBy(-_)

然后

rdd.zipWithIndex.filter(_._2 < 4)

提供前四个项目而不将rdd收集到驱动程序.

猜你在找的Scala相关文章