给出DataFrame:
+------------+---------+
|variableName|dataValue|
+------------+---------+
| IDKey| I1|
| b| y|
| a| x|
| IDKey| I2|
| a| z|
| b| w|
| c| q|
+------------+---------+
我想用相应的IDKey值创建一个新列,每当IDKey的dataValue更改时,每个值都会更改,这是预期的输出:
+------------+---------+----------+
|variableName|dataValue|idkeyValue|
+------------+---------+----------+
| IDKey| I1| I1|
| b| y| I1|
| a| x| I1|
| IDKey| I2| I2|
| a| z| I2|
| b| w| I2|
| c| q| I2|
+------------+---------+----------+
我尝试通过使用mapPartitions()
和全局变量来完成以下代码
var currentVarValue = ""
frame
.mapPartitions{ partition =>
partition.map { row =>
val (varName,dataValue) = (row.getString(0),row.getString(1))
val idKeyValue = if (currentVarValue != dataValue && varName == "IDKey") {
currentVarValue = dataValue
dataValue
} else {
currentVarValue
}
ExtendedData(varName,dataValue,currentVarValue)
}
}
但是,由于以下两个基本问题,此方法不起作用:Spark doesn't handle global variables,而且,这不符合函数式编程风格
我将很高兴对此提供任何帮助 谢谢!