我正在使用Spark结构化流-或多或少-通过DecisionTreeRegressor调整数据。
我想重用我已经安装的PipelineModel以再次适合新数据。 可能吗? 我已经尝试加载我的PipelineModel并将其阶段添加到管道中并将数据拟合到新模型上。
val modelDirectory = "/mnt/D834B3AF34B38ECE/DEV/hadoop/model"
var model : PipelineModel = _
var newModel : PipelineModel = _
var pipeline : Pipeline = _
..........
val trainingData = //an instance of a dataframne
val testData = //an instance of a dataframne
val assembler = new VectorAssembler()
.setInputCols(Array("routeId","stopId","month","dayOfWeek","hour","temperature","humidity","pressure","rain","snow","visibility"))
.setOutputCol("features")
val dt = new DecisionTreeRegressor()
.setLabelCol("value")
.setfeaturesCol("features")
.setImpurity("variance")
.setMaxDepth(30)
.setMaxBins(32)
.setMinInstancesPerNode(5)
pipeline = new Pipeline()
try {
model = PipelineModel.load(modelDirectory)
pipeline.setStages(model.stages)
} catch {
case iie: InvalidInputException => {
pipeline.setStages(Array(assembler,dt))
printf(iie.getMessage)
}
case unknownError: UnknownError => {
printf(unknownError.getMessage)
}
}
newModel = pipeline.fit(trainingData)
// Make predictions.
val predictions: DataFrame = model.transform(testData)
// Select example rows to display.
print(s"Predictions based on ${System.currentTimeMillis()} time train: ${System.lineseparator()}")
predictions.show(10,false)
// Select (prediction,true label) and compute test error
val evaluator = new MulticlassClassificationEvaluator()
.setLabelCol("value")
.setPredictionCol("prediction")
.setMetricName("accuracy")
val accuracy = evaluator.evaluate(predictions)
您可以在以下位置找到我的完整源代码:https://github.com/Hakuhun/bkk-data-process-spark/blob/master/src/main/scala/hu/oe/bakonyi/bkk/BkkDataDeserializer.scala