scala – 当child actor在未来的onFailure中抛出异常时,Akka主管演员不会处理异常

前端之家收集整理的这篇文章主要介绍了scala – 当child actor在未来的onFailure中抛出异常时,Akka主管演员不会处理异常前端之家小编觉得挺不错的,现在分享给大家,也给大家做个参考。
我正面临一个Akka主管演员的问题.当子actor在未来结果的onFailure方法中抛出异常时,主管不处理错误(我想在ConnectException的情况下重启子进程).

我正在使用Akka 2.3.7.

这是主管演员:

class MobileUsersActor extends Actor with ActorLogging {

  import Model.Implicits._
  import Model.MobileNotifications

  override val supervisorStrategy =
    OneForOneStrategy(maxNrOfRetries = 3,withinTimeRange = 1 minute) {
      case _: java.net.ConnectException => {
        Logger.error("API connection error. Check your proxy configuration.")
        Restart
      }
    }

  def receive = {
    case Start => findMobileUsers
  }

  private def findMobileUsers = {
    val notis = MobileNotificationsRepository().find()
    notis.map(invokePushSender)
  }

  private def invokePushSender(notis: List[MobileNotifications]) = {
    notis.foreach { n =>
      val pushSender = context.actorOf(PushSenderActor.props)
      pushSender ! Send(n)
    }
  }

}

这是儿童演员:

class PushSenderActor extends Actor with ActorLogging {

  def receive = {
    case Send(noti) => {
      val response = sendPushNotification(noti) onFailure {
        case e: ConnectException => throw e
      }
    }
  }

  private def sendPushNotification(noti: MobileNotifications): Future[WSResponse] = {
    val message = "Push notification message example"
    Logger.info(s"Push Notification >> $message to users " + noti.users)
    PushClient.sendNotification(message,noti.users)
  }

}

我试图通过akka.actor.Status.Failure(e)通知发件人,如建议here,但没有工作,该例外由主管未处理.

作为一种解决方法,我找到了这种方式来实现它:

class PushSenderActor extends Actor with ActorLogging {

  def receive = {
    case Send(noti) => {
      val response = sendPushNotification(noti) onFailure {
        case e: ConnectException => self ! APIConnectionError
      }
    }
    case APIConnectionError => throw new ConnectException
  }

  private def sendPushNotification(noti: MobileNotifications): Future[WSResponse] = {
    val message = "Push notification message example"
    Logger.info(s"Push Notification >> $message to users " + noti.users)
    PushClient.sendNotification(message,noti.users)
  }

}

这是一个Akka错误还是我做错了什么?

谢谢!

解决方法

我认为问题在于,Future中抛出的异常与Actor运行的异常(可能)不属于同一个线程(更有经验的人可以详细说明).因此,问题是Future体内抛出的异常被“吞噬”而不会传播到Actor.由于这种情况,Actor不会失败,因此不需要应用监督策略.所以,我想到的第一个解决方案是在一些消息中将异常包装在Future中,发送给自己,然后从Actor上下文中抛出它.这一次,将捕获异常并应用监督策略.但请注意,除非您再次发送Send(noti)消息,否则您将看不到自Actor重新启动以来发生的异常.总而言之,代码将是这样的:

class PushSenderActor extends Actor with ActorLogging {

  case class SmthFailed(e: Exception)

  def receive = {
    case Send(noti) => {
      val response = sendPushNotification(noti) onFailure {
        case e: ConnectException => self ! SmthFailed(e) // send the exception to yourself
      }
    }

    case SmthFailed(e) =>
      throw e // this one will be caught by the supervisor
  }

  private def sendPushNotification(noti: MobileNotifications): Future[WSResponse] = {
    val message = "Push notification message example"
    Logger.info(s"Push Notification >> $message to users " + noti.users)
    PushClient.sendNotification(message,noti.users)
  }

}

希望它有所帮助.

猜你在找的Scala相关文章