Scala并行执行

我正在研究一项要求,以获取有关使用Scala存储在Linux中的文件的统计信息。

我们将传递根目录作为输入,我们的代码将获取传递的根目录的子目录的完整列表。

然后对于列表中的每个目录,我将获取文件列表,对于每个文件,我将获取所有者,组,权限,lastmodifiedtime,createdtime,lastaccesstime。

问题是如何并行处理目录列表以获取存储在该目录中的文件的统计信息。

在生产环境中,根文件夹中有100000+个文件夹。

所以我的列表中有100000多个文件夹列表。

如何在可用列表中使我的操作(文件统计信息)并行化。

由于我是Scala的新手,请在此要求方面为我提供帮助。

很抱歉,没有代码段的发布。

谢谢。

nt890121 回答:Scala并行执行

我最终使用了Akka演员。

我对您所需的输出进行了假设,以便该程序既简单又快速。我所做的假设是输出是JSON,未保留层次结构,并且可以接受多个文件。如果您不喜欢JSON,可以将其替换为其他内容,但另外两个假设对于保持程序的当前速度和简单性很重要。

您可以设置一些命令行参数。如果未设置,则将使用默认值。默认值包含在Main.scala中。

命令行参数如下:

(0)您要从中开始的根目录; (无默认值)

(1)此程序中所有超时的超时间隔(以秒为单位); (默认为60)

(2)要使用的打印机参与者的数量;这将是创建的日志文件的数量; (默认值为50)

(3)用于监视参与者的滴答间隔; (默认为500)

对于超时,请记住,这是在程序完成时等待的时间间隔的值。因此,如果您从事一项小工作,并且想知道为什么要花一分钟的时间才能完成,那是因为它在关闭程序之前正在等待超时间隔。

由于您正在执行如此大的作业,因此默认超时60可能太小。如果您收到有关超时的抱怨,请增加超时值。

请注意,如果您的刻度间隔设置得太高,则您的程序可能会过早关闭。

要运行,只需在项目文件夹中启动sbt,然后键入

runMain Main <canonical path of root directory>

我不知道如何用Java获取文件组。您需要对此进行研究,并将相关代码添加到Entity.scala和TraverseActor.scala。

TraverseActor.scala中的f.list()有时也返回null,这就是为什么我将其包装在Option中。您必须调试该问题,以确保不会在某些文件上无提示失败。

现在,这是所有文件的内容。

build.sbt

name := "stackoverflow20191110"

version := "0.1"

scalaVersion := "2.12.1"

libraryDependencies ++= Seq(
  "io.circe" %% "circe-core","io.circe" %% "circe-generic","io.circe" %% "circe-parser"
).map(_ % "0.12.2")

libraryDependencies += "com.typesafe.akka" %% "akka-actor" % "2.4.16"

Entity.scala

import io.circe.Encoder
import io.circe.generic.semiauto._

sealed trait Entity {
  def path: String
  def owner: String
  def permissions: String
  def lastModifiedTime: String
  def creationTime: String
  def lastAccessTime: String
  def hashCode: Int
}

object Entity {
  implicit val entityEncoder: Encoder[Entity] = deriveEncoder
}

case class FileEntity(path: String,owner: String,permissions: String,lastModifiedTime: String,creationTime: String,lastAccessTime: String) extends Entity

object fileentityEncoder {
  implicit val fileentityEncoder: Encoder[FileEntity] = deriveEncoder
}

case class DirectoryEntity(path: String,lastAccessTime: String) extends Entity

object DirectoryEntity {
  implicit val directoryentityEncoder: Encoder[DirectoryEntity] = deriveEncoder
}

case class Contents(path: String,files: IndexedSeq[Entity])

object Contents {
  implicit val contentsEncoder: Encoder[Contents] = deriveEncoder
}

Main.scala

import akka.actor.ActorSystem
import akka.pattern.ask
import akka.util.Timeout
import java.io.{BufferedWriter,File,FileWriter}

import ShutDownActor.ShutDownYet

import scala.concurrent.Await
import scala.concurrent.duration._
import scala.util.Try

object Main {

  val defaultNumPrinters = 50

  val defaultMonitorTickInterval = 500

  val defaultTimeoutInS = 60

  def main(args: Array[String]): Unit = {
    val timeoutInS = Try(args(1).toInt).toOption.getOrElse(defaultTimeoutInS)

    val system = ActorSystem("SearchHierarchy")

    val shutdown = system.actorOf(ShutDownActor.props)

    val monitor = system.actorOf(MonitorActor.props(shutdown,timeoutInS))

    val refs = (0 until Try(args(2).toInt).toOption.getOrElse(defaultNumPrinters)).map{x =>
      val name = "logfile" + x
      (name,system.actorOf(PrintActor.props(name,Try(args(3).toInt).toOption.getOrElse(defaultMonitorTickInterval),monitor)))
    }

    val root = system.actorOf(TraverseActor.props(new File(args(0)),refs))

    implicit val askTimeout = Timeout(timeoutInS seconds)

    var isTimedOut = false

    while(!isTimedOut){
      Thread.sleep(30000)
      val fut = (shutdown ? ShutDownYet).mapTo[Boolean]
      isTimedOut = Await.result(fut,timeoutInS seconds)
    }

    refs.foreach{ x =>
      val fw = new BufferedWriter(new FileWriter(new File(x._1),true))
      fw.write("{}\n]")
      fw.close()
    }

    system.terminate
  }

}

MonitorActor.scala

import MonitorActor.ShutDown
import akka.actor.{Actor,ActorRef,Props,ReceiveTimeout,Stash}
import io.circe.syntax._

import scala.concurrent.duration._

class MonitorActor(shutdownActor: ActorRef,timeoutInS: Int) extends Actor with Stash {

  context.setReceiveTimeout(timeoutInS seconds)

  override def receive: Receive = {
    case ReceiveTimeout =>
      shutdownActor ! ShutDown
  }

}

object MonitorActor {
  def props(shutdownActor: ActorRef,timeoutInS: Int) = Props(new MonitorActor(shutdownActor,timeoutInS))

  case object ShutDown
}

PrintActor.scala

import java.io.{BufferedWriter,FileWriter,PrintWriter}

import akka.actor.{Actor,Stash}
import PrintActor.{Count,HeartBeat}

class PrintActor(name: String,interval: Int,monitorActor: ActorRef) extends Actor with Stash {

  val file = new File(name)

  override def preStart = {
    val fw = new BufferedWriter(new FileWriter(file,true))
    fw.write("[\n")
    fw.close()

    self ! Count(0)
  }

  override def receive: Receive = {
    case Count(c) =>
      context.become(withCount(c))
      unstashAll()

    case _ =>
      stash()
  }

  def withCount(c: Int): Receive = {
    case s: String =>
      val fw = new BufferedWriter(new FileWriter(file,true))
      fw.write(s)
      fw.write(",\n")
      fw.close()

      if (c == interval) {
        monitorActor ! HeartBeat
        context.become(withCount(0))
      } else {
        context.become(withCount(c+1))
      }
  }

}

object PrintActor {
  def props(name: String,monitorActor: ActorRef) = Props(new PrintActor(name,interval,monitorActor))

  case class Count(count: Int)

  case object HeartBeat
}

ShutDownActor.scala

import MonitorActor.ShutDown
import ShutDownActor.ShutDownYet
import akka.actor.{Actor,Stash}

class ShutDownActor() extends Actor with Stash {

  override def receive: Receive = {
    case ShutDownYet => sender ! false
    case ShutDown => context.become(canShutDown())
  }

  def canShutDown(): Receive = {
    case ShutDownYet => sender ! true
  }

}

object ShutDownActor {
  def props = Props(new ShutDownActor())

  case object ShutDownYet
}

TraverseActor.scala

import java.io.File

import akka.actor.{Actor,PoisonPill,ReceiveTimeout}
import io.circe.syntax._

import scala.collection.JavaConversions
import scala.concurrent.duration._
import scala.util.Try

class TraverseActor(start: File,printers: IndexedSeq[(String,ActorRef)]) extends Actor{

  val hash = start.hashCode()
  val mod = hash % printers.size
  val idx = if (mod < 0) -mod else mod
  val myPrinter = printers(idx)._2

  override def preStart = {
    self ! start
  }

  override def receive: Receive = {
    case f: File =>
      val path = f.getCanonicalPath
      val files = Option(f.list()).map(_.toIndexedSeq.map(x =>new File(path + "/" + x)))

      val directories = files.map(_.filter(_.isDirectory))

      directories.foreach(ds => processDirectories(ds))

      val entities = files.map{fs =>
        fs.map{ f =>
          val path = f.getCanonicalPath
          val owner = Try(java.nio.file.Files.getOwner(f.toPath).toString).toOption.getOrElse("")
          val permissions = Try(java.nio.file.Files.getPosixFilePermissions(f.toPath).toString).toOption.getOrElse("")
          val attributes = Try(java.nio.file.Files.readAttributes(f.toPath,"lastModifiedTime,creationTime,lastAccessTime"))
          val lastModifiedTime = attributes.flatMap(a => Try(a.get("lastModifiedTime").toString)).toOption.getOrElse("")
          val creationTime = attributes.flatMap(a => Try(a.get("creationTime").toString)).toOption.getOrElse("")
          val lastAccessTime = attributes.flatMap(a => Try(a.get("lastAccessTime").toString)).toOption.getOrElse("")

          if (f.isDirectory) FileEntity(path,owner,permissions,lastModifiedTime,lastAccessTime)
          else DirectoryEntity(path,lastAccessTime)
        }
      }

      directories match {
        case Some(seq) =>
          seq match {
            case x+:xs =>
            case IndexedSeq() => self ! PoisonPill
          }
        case None => self ! PoisonPill
      }

      entities.foreach(e => myPrinter ! Contents(f.getCanonicalPath,e).asJson.toString)
  }

  def processDirectories(directories: IndexedSeq[File]): Unit = {
    def inner(fs: IndexedSeq[File]): Unit = {
      fs match {
        case x +: xs =>
          context.actorOf(TraverseActor.props(x,printers))
          processDirectories(xs)
        case IndexedSeq() =>
      }

    }

    directories match {
      case x +: xs =>
        self ! x
        inner(xs)
      case IndexedSeq() =>
    }
  }

}

object TraverseActor {
  def props(start: File,ActorRef)]) = Props(new TraverseActor(start,printers))
}

我仅在一个小示例上进行了测试,因此该程序在运行您的工作时可能会遇到问题。如果发生这种情况,请随时提问。

本文链接:https://www.f2er.com/3130150.html

大家都在问