我正在尝试遍历包含要使用窗口函数进行排序的列的列名列表,并添加一个新列以及源列的最高值。为此,我将输入数据帧声明为for循环外的变量,然后在循环内对其进行更新。这导致Task not serializable
,据我所知,可能是由于以下事实:在主节点上声明了varialbe df,然后for循环正尝试在辅助节点上对其进行访问。我可以使用相同的逻辑避免这种错误,还是应该采用其他方法添加这些列?
def getHighestScoredAttributes(scoredDF: DataFrame,attributes: Array[String]) : DataFrame = {
var df = scoredDF
for (attribute <- attributes) {
val maxValidWindow = Window.partitionBy(df("druid")).orderBy(
when(df("validity") === lit("valid"),lit(1)).otherwise(lit(0)).desc,when(df(attribute).isnotNull,df("rank").desc_nulls_last)
val maxInvalidWindow = Window.partitionBy(df("druid")).orderBy(
when(df("validity") === lit("invalid"),df("rank").desc_nulls_last)
df = df.withColumn("valid_" + attribute,first(attribute) over maxValidWindow)
.withColumn("valid_" + attribute + "_dt",first("attest_dt") over maxValidWindow)
.withColumn("invalid_" + attribute,first(attribute) over maxInvalidWindow)
.withColumn("invalid_" + attribute + "_dt",first("attest_dt") over maxInvalidWindow)
}
df
}