我有以下数据框:
+---------+----------+---------------------+------------+-----------------+
|CAR_OWNER|MOTOR_TYPE|ELECTRIFICATION_RATIO|ENERGY_IN_US|ENERGY_OUTSIDE_US|
+---------+----------+---------------------+------------+-----------------+
| Alex|Electrical| 1.0| 15| 0|
| Bob| Thermical| 0.0| 0| 5|
| Claire| Hybrid| 0.5| 0| 10|
+---------+----------+---------------------+------------+-----------------+
使用以下功能:
def associateEnergy(motorType: String,consumedEnergy: Float,consumptionType: String,elecRatio: Float): Float =
motorType match {
case "Electrical" => if (consumptionType == "ELEC") consumedEnergy else 0f
case "Thermical" => if (consumptionType == "THERM") consumedEnergy else 0f
case "Hybrid" => if (consumptionType == "ELEC") consumedEnergy * elecRatio else consumedEnergy * (1 - elecRatio)
}
我想计算以下字段:
- ELEC_ENERGY_IN_US
- ELEC_ENERGY_OUTSIDE_US
- THERM_ENERGY_IN_US
- THERM_ENERGY_IN_US
我可以使用以下Udf进行操作:
def associateEnergyUdf(consumptionType: String) = udf(
(motorType: String,elecRatio: Float) =>
associateEnergy(motorType,consumedEnergy,consumptionType,elecRatio)
)
和这段代码:
inputDf
.withColumn("ELEC_ENERGY_IN_US",associateEnergyUdf("ELEC")(col("MOTOR_TYPE"),col("ENERGY_IN_US"),col("ELECTRIFICATION_RATIO")))
.withColumn("ELEC_ENERGY_OUTSIDE_US",col("ENERGY_OUTSIDE_US"),col("ELECTRIFICATION_RATIO")))
.withColumn("THERM_ENERGY_IN_US",associateEnergyUdf("THERM")(col("MOTOR_TYPE"),col("ELECTRIFICATION_RATIO")))
.withColumn("THERM_ENERGY_OUTSIDE_US",col("ELECTRIFICATION_RATIO")))
但是我不想重复四次 col(“ MOTOR_TYPE”)和 col(“ ELECTRIFICATION_RATIO”)参数。因此,我创建了以下udf:
def associateEnergyReducedUdf(consumptionType: String)(consumedEnergyCol: Column) = udf(
() => associateEnergyUdf(consumptionType)(col("MOTOR_TYPE"),consumedEnergyCol,col("ELECTRIFICATION_RATIO"))
)
这样我只需要打电话:
inputDf
.withColumn("ELEC_ENERGY_IN_US",associateEnergyReducedUdf("ELEC")(col("ENERGY_IN_US"))())
.withColumn("ELEC_ENERGY_OUTSIDE_US",associateEnergyReducedUdf("ELEC")(col("ENERGY_OUTSIDE_US"))())
.withColumn("THERM_ENERGY_IN_US",associateEnergyReducedUdf("THERM")(col("ENERGY_IN_US"))())
.withColumn("THERM_ENERGY_OUTSIDE_US",associateEnergyReducedUdf("THERM")(col("ENERGY_OUTSIDE_US"))())
但这会导致以下错误:
java.lang.UnsupportedOperationException: Schema for type org.apache.spark.sql.Column is not supported
如何在不重复不必要的参数的情况下实现这一目标?