如何根据某些条件处理Akka流?

假设存在一些要处理的文件流,并且满足条件时仅应处理(消耗)特定文件。

即仅当流包含名为​​“ aaa”的文件时,才处理名为“ bbb”的文件

SomeFile(name: String)

正确的(推荐的)方法是什么?

lisuge 回答:如何根据某些条件处理Akka流?

好的,这是一个例子。在触发触发器之前,请谨慎在此处建立太大的缓冲区

class FileFinder {

  def matchFiles(triggerName: String,matchName: String): Flow[SomeFile,SomeFile,NotUsed] =
    Flow[SomeFile].statefulMapConcat(
      statefulMatcher(matches(triggerName),matches(matchName)))

  private def matches(matchName: String): SomeFile => Boolean = {
    case SomeFile(name) if name == matchName => true
    case _                                   => false
  }

  private def statefulMatcher(
      triggerFilter: => SomeFile => Boolean,sendFilter: SomeFile => Boolean): () => SomeFile => List[SomeFile] = {
    var found = false
    var sendFiles: List[SomeFile] = Nil
    () => file: SomeFile =>
      {
        file match {
          case f if triggerFilter(f) =>
            found = true
            val send = sendFiles
            sendFiles = Nil
            send
          case f if sendFilter(f) =>
            if (found)
              List(f)
            else {
              sendFiles = f :: sendFiles
              Nil
            }
          case _ => Nil
        }
      }
  }
}

object FileFinder extends FileFinder {
  def main(args: Array[String]): Unit = {
    implicit val system: ActorSystem = ActorSystem("finder")
    implicit val materializer: ActorMaterializer = ActorMaterializer()
    implicit val executor: ExecutionContextExecutor =
      materializer.executionContext
    implicit val loggingAdapter: LoggingAdapter = system.log

    val files = List(SomeFile("aaa"),SomeFile("bbb"),SomeFile("aaa"))
    Source(files)
      .via(matchFiles("bbb","aaa"))
      .runForeach(println(_))
      .andThen({
        case Success(_) =>
          println("Success")
          system.terminate()
        case Failure(ex) =>
          loggingAdapter.error("Shouldn't happen...",ex)
          system.terminate()
      })
  }

}

case class SomeFile(name: String)
本文链接:https://www.f2er.com/3162413.html

大家都在问