将列作为参数传递给UDF正文

我有以下数据框:

+---------+----------+---------------------+------------+-----------------+
|CAR_OWNER|MOTOR_TYPE|ELECTRIFICATION_RATIO|ENERGY_IN_US|ENERGY_OUTSIDE_US|
+---------+----------+---------------------+------------+-----------------+
|     Alex|Electrical|                  1.0|          15|                0|
|      Bob| Thermical|                  0.0|           0|                5|
|   Claire|    Hybrid|                  0.5|           0|               10|
+---------+----------+---------------------+------------+-----------------+

使用以下功能:

def associateEnergy(motorType: String,consumedEnergy: Float,consumptionType: String,elecRatio: Float): Float =
motorType match {
  case "Electrical" => if (consumptionType == "ELEC") consumedEnergy else 0f
  case "Thermical"  => if (consumptionType == "THERM") consumedEnergy else 0f
  case "Hybrid"     => if (consumptionType == "ELEC") consumedEnergy * elecRatio else consumedEnergy * (1 - elecRatio)
}

我想计算以下字段:

  • ELEC_ENERGY_IN_US
  • ELEC_ENERGY_OUTSIDE_US
  • THERM_ENERGY_IN_US
  • THERM_ENERGY_IN_US

我可以使用以下Udf进行操作:

def associateEnergyUdf(consumptionType: String) = udf(
    (motorType: String,elecRatio: Float) =>
      associateEnergy(motorType,consumedEnergy,consumptionType,elecRatio)
  )

和这段代码:

inputDf
    .withColumn("ELEC_ENERGY_IN_US",associateEnergyUdf("ELEC")(col("MOTOR_TYPE"),col("ENERGY_IN_US"),col("ELECTRIFICATION_RATIO")))
    .withColumn("ELEC_ENERGY_OUTSIDE_US",col("ENERGY_OUTSIDE_US"),col("ELECTRIFICATION_RATIO")))
    .withColumn("THERM_ENERGY_IN_US",associateEnergyUdf("THERM")(col("MOTOR_TYPE"),col("ELECTRIFICATION_RATIO")))
    .withColumn("THERM_ENERGY_OUTSIDE_US",col("ELECTRIFICATION_RATIO")))

但是我不想重复四次 col(“ MOTOR_TYPE”) col(“ ELECTRIFICATION_RATIO”)参数。因此,我创建了以下udf:

def associateEnergyReducedUdf(consumptionType: String)(consumedEnergyCol: Column) = udf(
    () => associateEnergyUdf(consumptionType)(col("MOTOR_TYPE"),consumedEnergyCol,col("ELECTRIFICATION_RATIO"))
  )

这样我只需要打电话:

inputDf
    .withColumn("ELEC_ENERGY_IN_US",associateEnergyReducedUdf("ELEC")(col("ENERGY_IN_US"))())
    .withColumn("ELEC_ENERGY_OUTSIDE_US",associateEnergyReducedUdf("ELEC")(col("ENERGY_OUTSIDE_US"))())
    .withColumn("THERM_ENERGY_IN_US",associateEnergyReducedUdf("THERM")(col("ENERGY_IN_US"))())
    .withColumn("THERM_ENERGY_OUTSIDE_US",associateEnergyReducedUdf("THERM")(col("ENERGY_OUTSIDE_US"))())

但这会导致以下错误:

java.lang.UnsupportedOperationException: Schema for type org.apache.spark.sql.Column is not supported

如何在不重复不必要的参数的情况下实现这一目标?

baojingxzwj 回答:将列作为参数传递给UDF正文

在简化的UDF中,将一个已经存在的UDF包装到另一个UDF中。请尝试:

istioctl

这有效

本文链接:https://www.f2er.com/3093696.html

大家都在问