如何在Spark中使用不一致的数据拆分列

我试图连接两个数据框,并使用该属性的值动态创建新列(或至少尝试这样做)。

我必须从FormulaTable中拆分列并创建其他列,然后将其与属性表连接。

但是,我无法正确动态地拆分列。

我有两个问题,在接下来的步骤中,我将其保留为粗体。
因此,目前在我的FormulaTable中,数据是这样的。

val attributeFormulaDF = Seq("A0004*A0003","A0003*A0005").toDF("AttributeFormula")  

所以数据就像

+----------------+
|AttributeFormula|
+----------------+
|A0004*A0003     |
|A0003*A0005     |
+----------------+

属性数据是这样的。

    val attrValTransposedDF = Seq(
      ("2007",201801,"DEL","A0003","NA","ATTRIB_DAY1_VALUE","801"),("2007","ATTRIB_DAY2_VALUE","802"),"ATTRIB_DAY3_VALUE","803"),"ATTRIB_DAY4_VALUE","804"),"ATTRIB_DAY5_VALUE","805"),"ATTRIB_DAY6_VALUE","736"),"ATTRIB_DAY7_VALUE","1007"),"A0004","901"),"902"),"903"),"904"),"905"),"936"),"9007"))
      .toDF("Store_Number","Attribute_Week_Number","Department_Code","Attribute_Code","Attribute_General_Value","Day","Value")
      .select("Attribute_Code","Value")

所以数据就像

+--------------+-----------------+-----+
|Attribute_Code|Day              |Value|
+--------------+-----------------+-----+
|A0003         |ATTRIB_DAY1_VALUE|801  |
|A0003         |ATTRIB_DAY2_VALUE|802  |
|A0003         |ATTRIB_DAY3_VALUE|803  |
|A0003         |ATTRIB_DAY4_VALUE|804  |
|A0003         |ATTRIB_DAY5_VALUE|805  |
|A0003         |ATTRIB_DAY6_VALUE|736  |
|A0003         |ATTRIB_DAY7_VALUE|1007 |
|A0004         |ATTRIB_DAY1_VALUE|901  |
|A0004         |ATTRIB_DAY2_VALUE|902  |
|A0004         |ATTRIB_DAY3_VALUE|903  |
|A0004         |ATTRIB_DAY4_VALUE|904  |
|A0004         |ATTRIB_DAY5_VALUE|905  |
|A0004         |ATTRIB_DAY6_VALUE|936  |
|A0004         |ATTRIB_DAY7_VALUE|9007 |
+--------------+-----------------+-----+

现在我根据*分割它

val firstDF = attributeFormulaDF.select("AttributeFormula")
    val rowVal = firstDF.first.mkString.split("\\*").length
    val columnSeq = (0 until rowVal).map(i => col("temp").getItem(i).as(s"col$i"))
    val newDFWithSplitColumn = firstDF.withColumn("temp",split(col("AttributeFormula"),"\\*"))
        .select(col("*") +: columnSeq :_*).drop("temp")

我已经提到了这个stackOverFlow帖子(Split 1 column into 3 columns in spark scala

所以拆分列就像

+----------------+-----+-----+
|AttributeFormula|col0 |col1 |
+----------------+-----+-----+
|A0004*A0003     |A0004|A0003|
|A0003*A0005     |A0003|A0005|
+----------------+-----+-----+

问题1:如果我的AttributeFormula可以具有任意数量的属性列表(只是一个字符串),我将如何动态拆分它。

eg: 
+-----------------+
|AttributeFormula |
+-----------------+
|A0004            |
|A0004*A0003      |
|A0003*A0005      |
|A0003*A0004      |
|A0003*A0004*A0005|
+-----------------+

所以我有点像这样

+---------------- +-----+-----+------+
|AttributeFormula |col0 |col1 | col2 | 
+---------------- +-----+-----+------+
|A0004            |A0004|null | null |
|A0003*A0005      |A0003|A0005| null |
|A0003*A0004      |A0003|A0004| null |
|A0003*A0004*A0005|A0003|A0004| A0005|
+----------------+-----+-----+

我再次将attributeFormula与属性值结合起来,以获取公式值column。

val joinColumnCondition  = newDFWithSplitColumn.columns
      .withFilter(_.startsWith("col"))
      .map(col(_) === attrValTransposedDF("Attribute_Code"))
//using zipWithIndex to make the value columns separate and to avoid ambiguous error while joining
      val dataFrameList = joinColumnCondition.zipWithIndex.map {  
      i =>
        newDFWithSplitColumn.join(attrValTransposedDF,i._1)
          .withColumnRenamed("Value",s"Value${i._2}")
          .drop("Attribute_Code")
    }

     val combinedDataFrame = dataFrameList.reduce(_.join(_,Seq("Day","AttributeFormula"),"LEFT"))

     val toBeconcatColumn = combinedDataFrame.columns.filter(_.startsWith("Value"))

    combinedDataFrame
      .withColumn("AttributeFormulaValues",concat_ws("*",toBeconcatColumn.map(c => col(c)): _*))
      .select("Day","AttributeFormula","AttributeFormulaValues")

所以我的最终输出看起来像这样。

+-----------------+----------------+----------------------+
|Day              |AttributeFormula|AttributeFormulaValues|
+-----------------+----------------+----------------------+
|ATTRIB_DAY7_VALUE|A0004*A0003     |9007*1007             |
|ATTRIB_DAY6_VALUE|A0004*A0003     |936*736               |
|ATTRIB_DAY5_VALUE|A0004*A0003     |905*805               |
|ATTRIB_DAY4_VALUE|A0004*A0003     |904*804               |
|ATTRIB_DAY3_VALUE|A0004*A0003     |903*803               |
|ATTRIB_DAY2_VALUE|A0004*A0003     |902*802               |
|ATTRIB_DAY1_VALUE|A0004*A0003     |901*801               |
|ATTRIB_DAY7_VALUE|A0003           |1007                  |
|ATTRIB_DAY6_VALUE|A0003           |736                   |
|ATTRIB_DAY5_VALUE|A0003           |805                   |
|ATTRIB_DAY4_VALUE|A0003           |804                   |
|ATTRIB_DAY3_VALUE|A0003           |803                   |
|ATTRIB_DAY2_VALUE|A0003           |802                   |
|ATTRIB_DAY1_VALUE|A0003           |801                   |
+-----------------+----------------+----------------------+

如果我只有固定的attributeFormula(即与问题1有关),则此代码可以正常工作

问题2:如何避免使用数据框列表并使用reduce函数?

emrick 回答:如何在Spark中使用不一致的数据拆分列

对于问题1,这是一个可能的解决方案:

鉴于您有一个带有公式的数据框:

 val attributeFormulaDF = Seq("A0004*A0003","A0003*A0005","A0003*A0004*A0005").toDF("formula")

您可以拆分它并形成一个数组

val splitFormula = attributeFormulaDF.select(col("formula"),split(col("formula"),"\\*").as("split"))

之后,选择最大数组大小

  val maxSize = splitFormula.select(max(size(col("split")))).first().getInt(0)

现在有趣的部分是,基于最大大小,您可以开始生成列并将其关联到上一个数组

  val enhancedFormula = (0 until(maxSize)).foldLeft(splitFormula)( (df,i) => {
    df.withColumn(s"col_${i}",expr(s"split[${i}]"))
  })

这是输出

+-----------------+--------------------+-----+-----+-----+
|          formula|               split|col_0|col_1|col_2|
+-----------------+--------------------+-----+-----+-----+
|      A0004*A0003|      [A0004,A0003]|A0004|A0003| null|
|      A0003*A0005|      [A0003,A0005]|A0003|A0005| null|
|A0003*A0004*A0005|[A0003,A0004,A0...|A0003|A0004|A0005|
+-----------------+--------------------+-----+-----+-----+

我认为这很容易用于问题2

本文链接:https://www.f2er.com/3124228.html

大家都在问