在将列动态添加到数据帧时如何避免序列化错误?

我正在尝试遍历包含要使用窗口函数进行排序的列的列名列表,并添加一个新列以及源列的最高值。为此,我将输入数据帧声明为for循环外的变量,然后在循环内对其进行更新。这导致Task not serializable,据我所知,可能是由于以下事实:在主节点上声明了varialbe df,然后for循环正尝试在辅助节点上对其进行访问。我可以使用相同的逻辑避免这种错误,还是应该采用其他方法添加这些列?

 def getHighestScoredAttributes(scoredDF: DataFrame,attributes: Array[String]) : DataFrame = {
    var df = scoredDF
    for (attribute <- attributes) {
      val maxValidWindow = Window.partitionBy(df("druid")).orderBy(
        when(df("validity") === lit("valid"),lit(1)).otherwise(lit(0)).desc,when(df(attribute).isnotNull,df("rank").desc_nulls_last)
      val maxInvalidWindow = Window.partitionBy(df("druid")).orderBy(
        when(df("validity") === lit("invalid"),df("rank").desc_nulls_last)
      df = df.withColumn("valid_" + attribute,first(attribute) over maxValidWindow)
        .withColumn("valid_" + attribute + "_dt",first("attest_dt") over maxValidWindow)
        .withColumn("invalid_" + attribute,first(attribute) over maxInvalidWindow)
        .withColumn("invalid_" + attribute + "_dt",first("attest_dt") over maxInvalidWindow)
    }
    df
  }
woshiwuchao1 回答:在将列动态添加到数据帧时如何避免序列化错误?

原来的问题不是上面的代码,而是orderby中的rank列。它使用窗口函数在该函数的范围之外声明,但在函数主体中延迟求值,从而导致此错误。用@transient标记Window val可解决此问题。

本文链接:https://www.f2er.com/2937467.html

大家都在问