我试图连接两个数据框,并使用该属性的值动态创建新列(或至少尝试这样做)。
我必须从FormulaTable中拆分列并创建其他列,然后将其与属性表连接。
但是,我无法正确动态地拆分列。
我有两个问题,在接下来的步骤中,我将其保留为粗体。
因此,目前在我的FormulaTable中,数据是这样的。
val attributeFormulaDF = Seq("A0004*A0003","A0003*A0005").toDF("AttributeFormula")
所以数据就像
+----------------+
|AttributeFormula|
+----------------+
|A0004*A0003 |
|A0003*A0005 |
+----------------+
属性数据是这样的。
val attrValTransposedDF = Seq(
("2007",201801,"DEL","A0003","NA","ATTRIB_DAY1_VALUE","801"),("2007","ATTRIB_DAY2_VALUE","802"),"ATTRIB_DAY3_VALUE","803"),"ATTRIB_DAY4_VALUE","804"),"ATTRIB_DAY5_VALUE","805"),"ATTRIB_DAY6_VALUE","736"),"ATTRIB_DAY7_VALUE","1007"),"A0004","901"),"902"),"903"),"904"),"905"),"936"),"9007"))
.toDF("Store_Number","Attribute_Week_Number","Department_Code","Attribute_Code","Attribute_General_Value","Day","Value")
.select("Attribute_Code","Value")
所以数据就像
+--------------+-----------------+-----+
|Attribute_Code|Day |Value|
+--------------+-----------------+-----+
|A0003 |ATTRIB_DAY1_VALUE|801 |
|A0003 |ATTRIB_DAY2_VALUE|802 |
|A0003 |ATTRIB_DAY3_VALUE|803 |
|A0003 |ATTRIB_DAY4_VALUE|804 |
|A0003 |ATTRIB_DAY5_VALUE|805 |
|A0003 |ATTRIB_DAY6_VALUE|736 |
|A0003 |ATTRIB_DAY7_VALUE|1007 |
|A0004 |ATTRIB_DAY1_VALUE|901 |
|A0004 |ATTRIB_DAY2_VALUE|902 |
|A0004 |ATTRIB_DAY3_VALUE|903 |
|A0004 |ATTRIB_DAY4_VALUE|904 |
|A0004 |ATTRIB_DAY5_VALUE|905 |
|A0004 |ATTRIB_DAY6_VALUE|936 |
|A0004 |ATTRIB_DAY7_VALUE|9007 |
+--------------+-----------------+-----+
现在我根据*分割它
val firstDF = attributeFormulaDF.select("AttributeFormula")
val rowVal = firstDF.first.mkString.split("\\*").length
val columnSeq = (0 until rowVal).map(i => col("temp").getItem(i).as(s"col$i"))
val newDFWithSplitColumn = firstDF.withColumn("temp",split(col("AttributeFormula"),"\\*"))
.select(col("*") +: columnSeq :_*).drop("temp")
我已经提到了这个stackOverFlow帖子(Split 1 column into 3 columns in spark scala)
所以拆分列就像
+----------------+-----+-----+
|AttributeFormula|col0 |col1 |
+----------------+-----+-----+
|A0004*A0003 |A0004|A0003|
|A0003*A0005 |A0003|A0005|
+----------------+-----+-----+
问题1:如果我的AttributeFormula可以具有任意数量的属性列表(只是一个字符串),我将如何动态拆分它。
eg:
+-----------------+
|AttributeFormula |
+-----------------+
|A0004 |
|A0004*A0003 |
|A0003*A0005 |
|A0003*A0004 |
|A0003*A0004*A0005|
+-----------------+
所以我有点像这样
+---------------- +-----+-----+------+
|AttributeFormula |col0 |col1 | col2 |
+---------------- +-----+-----+------+
|A0004 |A0004|null | null |
|A0003*A0005 |A0003|A0005| null |
|A0003*A0004 |A0003|A0004| null |
|A0003*A0004*A0005|A0003|A0004| A0005|
+----------------+-----+-----+
我再次将attributeFormula与属性值结合起来,以获取公式值column。
val joinColumnCondition = newDFWithSplitColumn.columns
.withFilter(_.startsWith("col"))
.map(col(_) === attrValTransposedDF("Attribute_Code"))
//using zipWithIndex to make the value columns separate and to avoid ambiguous error while joining
val dataFrameList = joinColumnCondition.zipWithIndex.map {
i =>
newDFWithSplitColumn.join(attrValTransposedDF,i._1)
.withColumnRenamed("Value",s"Value${i._2}")
.drop("Attribute_Code")
}
val combinedDataFrame = dataFrameList.reduce(_.join(_,Seq("Day","AttributeFormula"),"LEFT"))
val toBeconcatColumn = combinedDataFrame.columns.filter(_.startsWith("Value"))
combinedDataFrame
.withColumn("AttributeFormulaValues",concat_ws("*",toBeconcatColumn.map(c => col(c)): _*))
.select("Day","AttributeFormula","AttributeFormulaValues")
所以我的最终输出看起来像这样。
+-----------------+----------------+----------------------+
|Day |AttributeFormula|AttributeFormulaValues|
+-----------------+----------------+----------------------+
|ATTRIB_DAY7_VALUE|A0004*A0003 |9007*1007 |
|ATTRIB_DAY6_VALUE|A0004*A0003 |936*736 |
|ATTRIB_DAY5_VALUE|A0004*A0003 |905*805 |
|ATTRIB_DAY4_VALUE|A0004*A0003 |904*804 |
|ATTRIB_DAY3_VALUE|A0004*A0003 |903*803 |
|ATTRIB_DAY2_VALUE|A0004*A0003 |902*802 |
|ATTRIB_DAY1_VALUE|A0004*A0003 |901*801 |
|ATTRIB_DAY7_VALUE|A0003 |1007 |
|ATTRIB_DAY6_VALUE|A0003 |736 |
|ATTRIB_DAY5_VALUE|A0003 |805 |
|ATTRIB_DAY4_VALUE|A0003 |804 |
|ATTRIB_DAY3_VALUE|A0003 |803 |
|ATTRIB_DAY2_VALUE|A0003 |802 |
|ATTRIB_DAY1_VALUE|A0003 |801 |
+-----------------+----------------+----------------------+
如果我只有固定的attributeFormula(即与问题1有关),则此代码可以正常工作
问题2:如何避免使用数据框列表并使用reduce函数?