在流中使用Source.queues和Sink.queues的Akka Streams应用程序中无法处理项目

我正在尝试使用akka.stream.scaladsl.Source和Sink队列类创建(Akka HTTP)流处理流。 我正在使用队列,因为我的流程中有一个处理步骤会发出http请求,因此我希望此步骤能执行 因为有max-open-requests,所以这些项目不在队列中,一旦max-open-requests在运行,就停止从队列中取出。 结果是当我的连接池过载时会施加背压。

下面,我有一个非常简化的测试,它反映了我应用程序的主要逻辑。在测试“压力规格”中(如下) 我正在模拟多个同时连接,通过这些连接我将发送“ Requesto”对象的“源” 到类ServiceImpl的getResponses方法。 在处理步骤“ pullOffSinkQueue”中,您将注意到我正在增加一个计数器以查看有多少个项目 我离开了队列。

该测试将向Serviceimpl发送一组基数设置为相等的请求 streamedRequestsPerConnection * numSimultaneousConnections。

当我发送20个请求时,我的测试通过了。特别是请求数量 Sink.queue等于我发出的请求数。但是,如果 我将发送的请求数量增加到50个左右,在测试中看到一致的失败。 我收到如下消息

180 was not equal to 200
ScalaTestFailureLocation: com.foo.stressSpec at (stressSpec.scala:116)
Expected :200
actual   :180
<Click to see difference>

这表示从队列中取出的项目数不等于放入队列的项目数。 我感觉这可能是由于我的测试未正确等待所有放入流中的项目所致 待处理。如果有人有任何建议,我会全力以赴!代码在下面。

package com.foo

import java.util.concurrent.atomic.AtomicInteger

import akka.stream.actorAttributes.supervisionStrategy
import akka.stream.{Attributes,Materializer,QueueOfferResult}
import akka.stream.Supervision.resumingDecider
import akka.stream.scaladsl.{Flow,Keep,Sink,Source}

import scala.concurrent.{ExecutionContext,Future}


import akka.NotUsed
import akka.actor.actorSystem
import akka.event.{Logging,LoggingAdapter}
import akka.stream.actorMaterializer
import akka.stream.scaladsl.{Sink,Source}
import org.scalatest.mockito.MockitoSugar
import org.scalatest.{FunSuite,Matchers}

import scala.collection.immutable
import scala.concurrent.duration._
import scala.concurrent.{Await,Future,_}

final case class Responso()
final case class Requesto()

object Handler {
  val dbRequestCounter = new AtomicInteger(0)
}

class Handler(implicit ec: ExecutionContext,mat: Materializer) {
  import Handler._

  private val source =
    Source.queue[(Requesto,String)](8,akka.stream.OverflowStrategy.backpressure)
  private val sink =
    Sink.queue[(Requesto,String)]().withAttributes(Attributes.inputBuffer(8,8))
  private val (sourceQueue,sinkQueue) = source.toMat(sink)(Keep.both).run()

  def placeonSourceQueue(ar: Requesto): Future[QueueOfferResult] = {
    sourceQueue.offer((ar,"foo"))
  }

  def pullOffSinkQueue(qofr: QueueOfferResult): Future[Responso] = {
    dbRequestCounter.incrementAndGet()
    qofr match {
      case QueueOfferResult.Enqueued =>
        sinkQueue.pull().flatMap { mayberequestPair: Option[(Requesto,String)] =>
          Future.successful(Responso())
        }
      case error =>
        println("enqueuing error: " + error)
        Future.failed(new RuntimeException("enqueuing error: " + error))
    }
  }
}


class ServiceImpl(readHandler: Handler,writeHandler: Handler)
                 (implicit log: LoggingAdapter,mat: Materializer) {
  private val readAttributeFlow: Flow[Requesto,Responso,NotUsed] = {
    Flow[Requesto]
      .mapAsyncUnordered(1)(readHandler.placeonSourceQueue)
      .mapAsyncUnordered(1)(readHandler.pullOffSinkQueue)
  }

  def getResponses(request: Source[Requesto,NotUsed]): Source[Responso,NotUsed] =
    request
      .via(readAttributeFlow)
      .withAttributes(supervisionStrategy(resumingDecider))

}


class stressSpec
    extends FunSuite
    with MockitoSugar
    with Matchers {

  val streamedRequestsPerConnection  = 10
  val numSimultaneousConnections  = 20

  implicit val actorSystem: actorSystem = actorSystem()
  implicit val materializer: actorMaterializer = actorMaterializer()
  implicit val log: LoggingAdapter = Logging(actorSystem.eventStream,"test")
  implicit val ec: ExecutionContext = actorSystem.dispatcher

  import Handler._

  lazy val requestHandler = new Handler()
  lazy val svc: ServiceImpl =
    new ServiceImpl(requestHandler,requestHandler)

  test("can handle lots of simultaneous read requests") {
    val totalExpected  = streamedRequestsPerConnection * numSimultaneousConnections

    def sendRequestAndAwaitResponse(): Unit = {
      def getResponses(i: Integer) = {
        val requestStream: Source[Requesto,NotUsed] =
          Source(1 to streamedRequestsPerConnection)
            .map { i =>
              Requesto()
            }
        svc.getResponses(requestStream).runWith(Sink.seq)
      }

      val responses: immutable.Seq[Future[immutable.Seq[Responso]]] =
        (1 to numSimultaneousConnections).map { getResponses(_) }
      val flattenedResponses: Future[immutable.Seq[Responso]] =
        Future.sequence(responses).map(_.flatten)
      Await.ready(flattenedResponses,1000.seconds).value.get
    }

    sendRequestAndAwaitResponse()
    dbRequestCounter.get shouldBe(totalExpected)

  }
}
maxiuping 回答:在流中使用Source.queues和Sink.queues的Akka Streams应用程序中无法处理项目

暂时没有好的解决方案,如果你有好的解决方案,请发邮件至:iooj@foxmail.com
本文链接:https://www.f2er.com/2987923.html

大家都在问