是否有其他方法可以在Spark中进行迭代联接-scala

用例是在给定的列中找到n个最大行(这些列可以是n个列),一旦拥有n个键,便将其重新连接到原始数据集以获取所需的所有行>

val df = Seq((“ 12”,“ Tom”,“ Hanks”),     (“ 13”,“ Meryl”,“ Streep”), (“ 12”,“汤姆”,“哈迪”), (“ 12”,“ John”,“ Streep”) ).toDF(“ age”,“ firstname”,“ lastname”)

让我们说我想将每个列与包含上面所有三列的较大的actor数据集单独连接。

val v1 = actors.join(df,Seq("id"),"inner")
val v2 =actors.join(df,Seq("firstname"),"inner")
val v3 =actors.join(df,Seq("lastname"),"inner")
val output = v1.union(v2).union(v3)

有什么方法可以不重复执行此操作?同样因为要连接的列可以是动态的。例如,有时它只能是id,或者只能是id和firstname。

totomy 回答:是否有其他方法可以在Spark中进行迭代联接-scala

您可以尝试其他方法,因此可以通过以下方式实现:

actors.join(df).where(
actors("id") === df("id") || 
actors("firstname") === df("firstname") || 
actors("lastname") === df("lastname")
)

对于n列,您可以尝试:

  val joinCols = Seq("id","firstname","lastname") // or actors.columns
  val condition = joinCols
    .map(s => (actors(s) === df(s)))
    .reduce((a,b) => a || b)

您将获得以下条件:

condition.explain(true)
(((a#7 = a#7) || (b#8 = b#8)) || (c#9 = c#9))

最后使用它:

   actors.join(df).where(condition)
,

我认为使用较小的数据集进行广播,并使用udf检查较大的数据集将解决此问题。我一直在思考联接的问题!

,

@chlebek解决方案应该可以很好地工作,这是您要重现初始逻辑的另一种方法:

val cols = Seq("id","lastname")

val final_df = cols.map{
     df.join(actors,Seq(_),"inner") 
}
.reduce(_ union _)

首先,我们为每列生成一个内部联接,然后我们联合

本文链接:https://www.f2er.com/3158625.html

大家都在问