假设存在一些要处理的文件流,并且满足条件时仅应处理(消耗)特定文件。
即仅当流包含名为“ aaa”的文件时,才处理名为“ bbb”的文件
SomeFile(name: String)
正确的(推荐的)方法是什么?
假设存在一些要处理的文件流,并且满足条件时仅应处理(消耗)特定文件。
即仅当流包含名为“ aaa”的文件时,才处理名为“ bbb”的文件
SomeFile(name: String)
正确的(推荐的)方法是什么?
好的,这是一个例子。在触发触发器之前,请谨慎在此处建立太大的缓冲区
class FileFinder {
def matchFiles(triggerName: String,matchName: String): Flow[SomeFile,SomeFile,NotUsed] =
Flow[SomeFile].statefulMapConcat(
statefulMatcher(matches(triggerName),matches(matchName)))
private def matches(matchName: String): SomeFile => Boolean = {
case SomeFile(name) if name == matchName => true
case _ => false
}
private def statefulMatcher(
triggerFilter: => SomeFile => Boolean,sendFilter: SomeFile => Boolean): () => SomeFile => List[SomeFile] = {
var found = false
var sendFiles: List[SomeFile] = Nil
() => file: SomeFile =>
{
file match {
case f if triggerFilter(f) =>
found = true
val send = sendFiles
sendFiles = Nil
send
case f if sendFilter(f) =>
if (found)
List(f)
else {
sendFiles = f :: sendFiles
Nil
}
case _ => Nil
}
}
}
}
object FileFinder extends FileFinder {
def main(args: Array[String]): Unit = {
implicit val system: ActorSystem = ActorSystem("finder")
implicit val materializer: ActorMaterializer = ActorMaterializer()
implicit val executor: ExecutionContextExecutor =
materializer.executionContext
implicit val loggingAdapter: LoggingAdapter = system.log
val files = List(SomeFile("aaa"),SomeFile("bbb"),SomeFile("aaa"))
Source(files)
.via(matchFiles("bbb","aaa"))
.runForeach(println(_))
.andThen({
case Success(_) =>
println("Success")
system.terminate()
case Failure(ex) =>
loggingAdapter.error("Shouldn't happen...",ex)
system.terminate()
})
}
}
case class SomeFile(name: String)