如何将数组元素映射到Spark Dataframe中的每个记录

我正在处理一个看起来像这样的数据框-

val df = Seq(
(0.0  ),(0.0  ),(0.317),(-0.78),(-0.37),(0.0  )
).toDF("importance")

我现在有更多代码来将labelsfeatures列作为数组,如下所示-

val labels = Array(0,1,2)
import org.apache.spark.sql.functions.typedLit
val df1 = df.withColumn("labels",typedLit(labels))
val featureNames = Array("a","b","c","d")
val df2 = df1.withColumn("features",typedLit(featureNames))

scala> df2.show(false)
+----------+---------+------------+
|importance|labels   |features    |
+----------+---------+------------+
|0.0       |[0,2]|[a,b,c,d]|
|0.0       |[0,d]|
|0.317     |[0,d]|
|-0.78     |[0,d]|
|-0.37     |[0,d]|
+----------+---------+------------+

现在,使用此数据框,我想将重要性列的每个值与labelsfeatures数组的每个元素对齐。所以输出应该看起来像这样-

label feature name  importance
0         a             0      
0         b             0      
0         c             0      
0         d             0.3176 
1         a             0      
1         b             0      
1         c             -0.78  
1         d             -0.37  
2         a             0      
2         b             0      
2         c             0      
2         d             0  

因此,第一条记录具有label=0feature=a,并且具有importance = 0

wangzhenhua7 回答:如何将数组元素映射到Spark Dataframe中的每个记录

根据您的示例,没有足够的信息为我提供确定性的解决方案。由于spark是分布式处理引擎,因此您需要确定性的方式对importance数据帧进行排序才能获得所需的结果。

我认为需要改变方法以获得所需的输出:

  1. CROSS JOIN labelfeature进入labelFeatureDs
  2. 根据您需要rownumber进行的排序添加一列labelFeatureDs(在您的示例中,排序按label然后是feature
  3. 根据您在此处进行的排序,将rownumber添加到importance
  4. INNER JOIN上的importance labelFeatureDsrownumber

代码:

import org.apache.spark.sql.expressions.Window
spark.conf.set("spark.sql.crossJoin.enabled",true)

val importanceDf = Seq(
(0.0  ),(0.0  ),(0.317),(-0.78),(-0.37),(0.0  )
).
toDF("importance").
select(col("*"),row_number().over(Window.partitionBy(lit(null)).orderBy(lit(null))).as("rn"))
importanceDf.show

val labelsDf = Seq(0,1,2).toDF("label")
val featuresDf = Seq("a","b","c","d").toDF("feature")

val labelFeatureDs = labelsDf.join(featuresDf).sort($"label",$"feature").select(col("*"),row_number().over(Window.partitionBy(lit(null)).orderBy(lit(null))).as("rn"))
labelFeatureDs.show

val result = labelFeatureDs.join(importanceDf,"rn")

结果:

scala> result.show
+---+-----+-------+----------+
| rn|label|feature|importance|
+---+-----+-------+----------+
|  1|    0|      a|       0.0|
|  2|    0|      b|       0.0|
|  3|    0|      c|       0.0|
|  4|    0|      d|     0.317|
|  5|    1|      a|       0.0|
|  6|    1|      b|       0.0|
|  7|    1|      c|     -0.78|
|  8|    1|      d|     -0.37|
|  9|    2|      a|       0.0|
| 10|    2|      b|       0.0|
| 11|    2|      c|       0.0|
| 12|    2|      d|       0.0|
+---+-----+-------+----------+
本文链接:https://www.f2er.com/3141634.html

大家都在问