用新数据改装现有的Spark ML PipelineModel

我正在使用Spark结构化流-或多或少-通过DecisionTreeRegressor调整数据。

我想重用我已经安装的PipelineModel以再次适合新数据。 可能吗? 我已经尝试加载我的PipelineModel并将其阶段添加到管道中并将数据拟合到新模型上。

      val modelDirectory =  "/mnt/D834B3AF34B38ECE/DEV/hadoop/model"
      var model : PipelineModel = _
      var newModel : PipelineModel = _
      var pipeline : Pipeline = _

        ..........

      val trainingData = //an instance of a dataframne
      val testData = //an instance of a dataframne

      val assembler = new VectorAssembler()
        .setInputCols(Array("routeId","stopId","month","dayOfWeek","hour","temperature","humidity","pressure","rain","snow","visibility"))
        .setOutputCol("features")

      val dt = new DecisionTreeRegressor()
        .setLabelCol("value")
        .setfeaturesCol("features")
        .setImpurity("variance")
        .setMaxDepth(30)
        .setMaxBins(32)
        .setMinInstancesPerNode(5)

      pipeline = new Pipeline()

      try {
        model = PipelineModel.load(modelDirectory)
        pipeline.setStages(model.stages)
      } catch {
        case iie: InvalidInputException => {
          pipeline.setStages(Array(assembler,dt))
          printf(iie.getMessage)
        }
        case unknownError: UnknownError => {
          printf(unknownError.getMessage)
        }
      }

      newModel = pipeline.fit(trainingData)

      // Make predictions.
      val predictions: DataFrame = model.transform(testData)

      // Select example rows to display.
      print(s"Predictions based on ${System.currentTimeMillis()} time train: ${System.lineseparator()}")
      predictions.show(10,false)

      // Select (prediction,true label) and compute test error
      val evaluator = new MulticlassClassificationEvaluator()
        .setLabelCol("value")
        .setPredictionCol("prediction")
        .setMetricName("accuracy")
      val accuracy = evaluator.evaluate(predictions)

您可以在以下位置找到我的完整源代码:https://github.com/Hakuhun/bkk-data-process-spark/blob/master/src/main/scala/hu/oe/bakonyi/bkk/BkkDataDeserializer.scala

wanglong52044 回答:用新数据改装现有的Spark ML PipelineModel

Spark 2.4.4中无法重新安装已经安装的模型。

对于连续学习机器学习解决方案,请查看MLLibs文档。您可以使用StreamingLinearRegressionWithSVG,StreamingKmeans,StreamingLogisticRegressionWithSVG来实现。

还要记住,那是一个流应用程序,所以您的管道可能会不断学习

本文链接:https://www.f2er.com/3120712.html

大家都在问