我有:
RDD1具有我要比较的成对点
(2,5),(3,7),...
和具有每个点尺寸的RDD2
(0,List(5,7)),(1,List(2,4)),...
如何比较第二个rdd的尺寸,以便比较第一个rdd的对?
(rdds都很大,我无法收集它们)
(join不适用于其他rdd模式)
https://www.mdpi.com/1999-4893/12/8/166/htm#B28-algorithms-12-00166
我有:
RDD1具有我要比较的成对点
(2,5),(3,7),...
和具有每个点尺寸的RDD2
(0,List(5,7)),(1,List(2,4)),...
如何比较第二个rdd的尺寸,以便比较第一个rdd的对?
(rdds都很大,我无法收集它们)
(join不适用于其他rdd模式)
https://www.mdpi.com/1999-4893/12/8/166/htm#B28-algorithms-12-00166
添加了用于加入行的示例,希望它对您有用。 还要找到占位符,您可以在其中添加/修改代码以添加逻辑
import org.apache.spark.sql.functions._
import scala.collection.mutable
object JoinRdds {
def main(args: Array[String]): Unit = {
val spark = Constant.getSparkSess
import spark.implicits._
var df1 = List((2,5),(3,7)).toDF("x","y") // 1st Dataframe
val df2 = List((0,List(5,7)),(1,List(2,4))).toDF("id","coordinates") // 2nd Dataframe
df1 = df1.withColumn("id",monotonically_increasing_id()) // Add row num to 1st DF
// df2.join(df1,df1("id") === df2("id")) // perform inner join
// .drop("id") // drop the id column
// .show(false)
val rdd = df2.join(df1,df1("id") === df2("id")).rdd // here's your RDD you can
val resultCoordinates : Array[(Int,Int)] = rdd.map(row => { // Iterate the result row by row
// you can do all sort of operations per row return any type here.
val coordinates = row.getAs[mutable.WrappedArray[Int]]("coordinates")
val x = row.getAs[Integer]("x")
val y = row.getAs[Integer]("y")
(coordinates(0) - x,coordinates(1) - y )
}).collect() // the collect call on the output
resultCoordinates.foreach(r => println(s"(${r._1},${r._2})")) // printing the output result
}
}