我在Spark中有一个RDD,其中的对象基于案例类:
ExampleCaseClass(user: User,stuff: Stuff)
我想使用Spark的ML管道,所以我将其转换为Spark数据帧.作为管道的一部分,我想将其中一列转换为条目为向量的列.由于我希望该向量的长度随模型而变化,因此它应作为特征转换的一部分构建到管道中.
所以我试图按如下方式定义一个Transformer:
class MyTransformer extends Transformer { val uid = "" val num: IntParam = new IntParam(this,"","") def setNum(value: Int): this.type = set(num,value) setDefault(num -> 50) def transform(df: DataFrame): DataFrame = { ... } def transformSchema(schema: StructType): StructType = { val inputFields = schema.fields StructType(inputFields :+ StructField("colName",???,true)) } def copy (extra: ParamMap): Transformer = defaultCopy(extra) }
如何指定结果字段的DataType(即填写???)?它将是一个简单类的Vector(Boolean,Int,Double等).似乎VectorUDT可能有效,但这对Spark是私有的.由于任何RDD都可以转换为DataFrame,因此任何案例类都可以转换为自定义DataType.但是我无法弄清楚如何手动执行此转换,否则我可以将它应用于包装矢量的一些简单的case类.
此外,如果我为列指定矢量类型,当我适应模型时,VectorAssembler会将矢量正确地处理成单独的特征吗?
对Spark而言还是新手,尤其是ML Pipeline,所以感谢任何建议.
解决方法
import org.apache.spark.ml.linalg.sqlDataTypes.VectorType def transformSchema(schema: StructType): StructType = { val inputFields = schema.fields StructType(inputFields :+ StructField("colName",VectorType,true)) }
在Spark 2.1中,VectorType使VectorUDT公开可用:
package org.apache.spark.ml.linalg import org.apache.spark.annotation.{DeveloperApi,Since} import org.apache.spark.sql.types.DataType /** * :: DeveloperApi :: * sql data types for vectors and matrices. */ @Since("2.0.0") @DeveloperApi object sqlDataTypes { /** Data type for [[Vector]]. */ val VectorType: DataType = new VectorUDT /** Data type for [[Matrix]]. */ val MatrixType: DataType = new MatrixUDT }