根据Spark的先前值设置列值,而无需重复分组属性

给出DataFrame:

+------------+---------+
|variableName|dataValue|
+------------+---------+
|       IDKey|       I1|
|           b|        y|
|           a|        x|
|       IDKey|       I2|
|           a|        z|
|           b|        w|
|           c|        q|
+------------+---------+

我想用相应的IDKey值创建一个新列,每当IDKey的dataValue更改时,每个值都会更改,这是预期的输出:

+------------+---------+----------+
|variableName|dataValue|idkeyValue|
+------------+---------+----------+
|       IDKey|       I1|        I1|
|           b|        y|        I1|
|           a|        x|        I1|
|       IDKey|       I2|        I2|
|           a|        z|        I2|
|           b|        w|        I2|
|           c|        q|        I2|
+------------+---------+----------+

我尝试通过使用mapPartitions()和全局变量来完成以下代码

var currentVarValue = ""
frame
  .mapPartitions{ partition =>
    partition.map { row =>
      val (varName,dataValue) = (row.getString(0),row.getString(1))

      val idKeyValue = if (currentVarValue != dataValue && varName == "IDKey") {
        currentVarValue = dataValue
        dataValue
      } else {
        currentVarValue
      }

      ExtendedData(varName,dataValue,currentVarValue)
    }
  }

但是,由于以下两个基本问题,此方法不起作用:Spark doesn't handle global variables,而且,这不符合函数式编程风格

我将很高兴对此提供任何帮助 谢谢!

yaya19940102 回答:根据Spark的先前值设置列值,而无需重复分组属性

  

您无法以Spark方式解决高效表现问题,   没有足够的初始信息供Spark处理   确保所有数据都在同一分区中。如果我们都做   在同一个分区中进行处理,那么这不是真正的意图   火花。

实际上,不能发布明智的partitionBy(通过Window函数)。这里的问题是,数据表示连续的此类数据的长列表,如果前一个分区中的数据与当前分区相关,则需要跨分区进行查找。可以做到,但这是一项艰巨的工作。 zero323在这里某处试图解决该问题,但是如果我没记错的话,这很麻烦。

做到这一点的逻辑很容易,但是使用Spark却有问题。

没有分区按数据将所有数据改组到一个分区,可能会导致OOM和空间问题。

对不起。

本文链接:https://www.f2er.com/3151106.html

大家都在问