我最终使用了Akka演员。
我对您所需的输出进行了假设,以便该程序既简单又快速。我所做的假设是输出是JSON,未保留层次结构,并且可以接受多个文件。如果您不喜欢JSON,可以将其替换为其他内容,但另外两个假设对于保持程序的当前速度和简单性很重要。
您可以设置一些命令行参数。如果未设置,则将使用默认值。默认值包含在Main.scala中。
命令行参数如下:
(0)您要从中开始的根目录; (无默认值)
(1)此程序中所有超时的超时间隔(以秒为单位); (默认为60)
(2)要使用的打印机参与者的数量;这将是创建的日志文件的数量; (默认值为50)
(3)用于监视参与者的滴答间隔; (默认为500)
对于超时,请记住,这是在程序完成时等待的时间间隔的值。因此,如果您从事一项小工作,并且想知道为什么要花一分钟的时间才能完成,那是因为它在关闭程序之前正在等待超时间隔。
由于您正在执行如此大的作业,因此默认超时60可能太小。如果您收到有关超时的抱怨,请增加超时值。
请注意,如果您的刻度间隔设置得太高,则您的程序可能会过早关闭。
要运行,只需在项目文件夹中启动sbt,然后键入
runMain Main <canonical path of root directory>
我不知道如何用Java获取文件组。您需要对此进行研究,并将相关代码添加到Entity.scala和TraverseActor.scala。
TraverseActor.scala中的f.list()有时也返回null,这就是为什么我将其包装在Option中。您必须调试该问题,以确保不会在某些文件上无提示失败。
现在,这是所有文件的内容。
build.sbt
name := "stackoverflow20191110"
version := "0.1"
scalaVersion := "2.12.1"
libraryDependencies ++= Seq(
"io.circe" %% "circe-core","io.circe" %% "circe-generic","io.circe" %% "circe-parser"
).map(_ % "0.12.2")
libraryDependencies += "com.typesafe.akka" %% "akka-actor" % "2.4.16"
Entity.scala
import io.circe.Encoder
import io.circe.generic.semiauto._
sealed trait Entity {
def path: String
def owner: String
def permissions: String
def lastModifiedTime: String
def creationTime: String
def lastAccessTime: String
def hashCode: Int
}
object Entity {
implicit val entityEncoder: Encoder[Entity] = deriveEncoder
}
case class FileEntity(path: String,owner: String,permissions: String,lastModifiedTime: String,creationTime: String,lastAccessTime: String) extends Entity
object fileentityEncoder {
implicit val fileentityEncoder: Encoder[FileEntity] = deriveEncoder
}
case class DirectoryEntity(path: String,lastAccessTime: String) extends Entity
object DirectoryEntity {
implicit val directoryentityEncoder: Encoder[DirectoryEntity] = deriveEncoder
}
case class Contents(path: String,files: IndexedSeq[Entity])
object Contents {
implicit val contentsEncoder: Encoder[Contents] = deriveEncoder
}
Main.scala
import akka.actor.ActorSystem
import akka.pattern.ask
import akka.util.Timeout
import java.io.{BufferedWriter,File,FileWriter}
import ShutDownActor.ShutDownYet
import scala.concurrent.Await
import scala.concurrent.duration._
import scala.util.Try
object Main {
val defaultNumPrinters = 50
val defaultMonitorTickInterval = 500
val defaultTimeoutInS = 60
def main(args: Array[String]): Unit = {
val timeoutInS = Try(args(1).toInt).toOption.getOrElse(defaultTimeoutInS)
val system = ActorSystem("SearchHierarchy")
val shutdown = system.actorOf(ShutDownActor.props)
val monitor = system.actorOf(MonitorActor.props(shutdown,timeoutInS))
val refs = (0 until Try(args(2).toInt).toOption.getOrElse(defaultNumPrinters)).map{x =>
val name = "logfile" + x
(name,system.actorOf(PrintActor.props(name,Try(args(3).toInt).toOption.getOrElse(defaultMonitorTickInterval),monitor)))
}
val root = system.actorOf(TraverseActor.props(new File(args(0)),refs))
implicit val askTimeout = Timeout(timeoutInS seconds)
var isTimedOut = false
while(!isTimedOut){
Thread.sleep(30000)
val fut = (shutdown ? ShutDownYet).mapTo[Boolean]
isTimedOut = Await.result(fut,timeoutInS seconds)
}
refs.foreach{ x =>
val fw = new BufferedWriter(new FileWriter(new File(x._1),true))
fw.write("{}\n]")
fw.close()
}
system.terminate
}
}
MonitorActor.scala
import MonitorActor.ShutDown
import akka.actor.{Actor,ActorRef,Props,ReceiveTimeout,Stash}
import io.circe.syntax._
import scala.concurrent.duration._
class MonitorActor(shutdownActor: ActorRef,timeoutInS: Int) extends Actor with Stash {
context.setReceiveTimeout(timeoutInS seconds)
override def receive: Receive = {
case ReceiveTimeout =>
shutdownActor ! ShutDown
}
}
object MonitorActor {
def props(shutdownActor: ActorRef,timeoutInS: Int) = Props(new MonitorActor(shutdownActor,timeoutInS))
case object ShutDown
}
PrintActor.scala
import java.io.{BufferedWriter,FileWriter,PrintWriter}
import akka.actor.{Actor,Stash}
import PrintActor.{Count,HeartBeat}
class PrintActor(name: String,interval: Int,monitorActor: ActorRef) extends Actor with Stash {
val file = new File(name)
override def preStart = {
val fw = new BufferedWriter(new FileWriter(file,true))
fw.write("[\n")
fw.close()
self ! Count(0)
}
override def receive: Receive = {
case Count(c) =>
context.become(withCount(c))
unstashAll()
case _ =>
stash()
}
def withCount(c: Int): Receive = {
case s: String =>
val fw = new BufferedWriter(new FileWriter(file,true))
fw.write(s)
fw.write(",\n")
fw.close()
if (c == interval) {
monitorActor ! HeartBeat
context.become(withCount(0))
} else {
context.become(withCount(c+1))
}
}
}
object PrintActor {
def props(name: String,monitorActor: ActorRef) = Props(new PrintActor(name,interval,monitorActor))
case class Count(count: Int)
case object HeartBeat
}
ShutDownActor.scala
import MonitorActor.ShutDown
import ShutDownActor.ShutDownYet
import akka.actor.{Actor,Stash}
class ShutDownActor() extends Actor with Stash {
override def receive: Receive = {
case ShutDownYet => sender ! false
case ShutDown => context.become(canShutDown())
}
def canShutDown(): Receive = {
case ShutDownYet => sender ! true
}
}
object ShutDownActor {
def props = Props(new ShutDownActor())
case object ShutDownYet
}
TraverseActor.scala
import java.io.File
import akka.actor.{Actor,PoisonPill,ReceiveTimeout}
import io.circe.syntax._
import scala.collection.JavaConversions
import scala.concurrent.duration._
import scala.util.Try
class TraverseActor(start: File,printers: IndexedSeq[(String,ActorRef)]) extends Actor{
val hash = start.hashCode()
val mod = hash % printers.size
val idx = if (mod < 0) -mod else mod
val myPrinter = printers(idx)._2
override def preStart = {
self ! start
}
override def receive: Receive = {
case f: File =>
val path = f.getCanonicalPath
val files = Option(f.list()).map(_.toIndexedSeq.map(x =>new File(path + "/" + x)))
val directories = files.map(_.filter(_.isDirectory))
directories.foreach(ds => processDirectories(ds))
val entities = files.map{fs =>
fs.map{ f =>
val path = f.getCanonicalPath
val owner = Try(java.nio.file.Files.getOwner(f.toPath).toString).toOption.getOrElse("")
val permissions = Try(java.nio.file.Files.getPosixFilePermissions(f.toPath).toString).toOption.getOrElse("")
val attributes = Try(java.nio.file.Files.readAttributes(f.toPath,"lastModifiedTime,creationTime,lastAccessTime"))
val lastModifiedTime = attributes.flatMap(a => Try(a.get("lastModifiedTime").toString)).toOption.getOrElse("")
val creationTime = attributes.flatMap(a => Try(a.get("creationTime").toString)).toOption.getOrElse("")
val lastAccessTime = attributes.flatMap(a => Try(a.get("lastAccessTime").toString)).toOption.getOrElse("")
if (f.isDirectory) FileEntity(path,owner,permissions,lastModifiedTime,lastAccessTime)
else DirectoryEntity(path,lastAccessTime)
}
}
directories match {
case Some(seq) =>
seq match {
case x+:xs =>
case IndexedSeq() => self ! PoisonPill
}
case None => self ! PoisonPill
}
entities.foreach(e => myPrinter ! Contents(f.getCanonicalPath,e).asJson.toString)
}
def processDirectories(directories: IndexedSeq[File]): Unit = {
def inner(fs: IndexedSeq[File]): Unit = {
fs match {
case x +: xs =>
context.actorOf(TraverseActor.props(x,printers))
processDirectories(xs)
case IndexedSeq() =>
}
}
directories match {
case x +: xs =>
self ! x
inner(xs)
case IndexedSeq() =>
}
}
}
object TraverseActor {
def props(start: File,ActorRef)]) = Props(new TraverseActor(start,printers))
}
我仅在一个小示例上进行了测试,因此该程序在运行您的工作时可能会遇到问题。如果发生这种情况,请随时提问。
本文链接:https://www.f2er.com/3130150.html