我正在尝试使用akka.stream.scaladsl.Source和Sink队列类创建(Akka HTTP)流处理流。 我正在使用队列,因为我的流程中有一个处理步骤会发出http请求,因此我希望此步骤能执行 因为有max-open-requests,所以这些项目不在队列中,一旦max-open-requests在运行,就停止从队列中取出。 结果是当我的连接池过载时会施加背压。
下面,我有一个非常简化的测试,它反映了我应用程序的主要逻辑。在测试“压力规格”中(如下) 我正在模拟多个同时连接,通过这些连接我将发送“ Requesto”对象的“源” 到类ServiceImpl的getResponses方法。 在处理步骤“ pullOffSinkQueue”中,您将注意到我正在增加一个计数器以查看有多少个项目 我离开了队列。
该测试将向Serviceimpl发送一组基数设置为相等的请求 streamedRequestsPerConnection * numSimultaneousConnections。
当我发送20个请求时,我的测试通过了。特别是请求数量 Sink.queue等于我发出的请求数。但是,如果 我将发送的请求数量增加到50个左右,在测试中看到一致的失败。 我收到如下消息
180 was not equal to 200
ScalaTestFailureLocation: com.foo.stressSpec at (stressSpec.scala:116)
Expected :200
actual :180
<Click to see difference>
这表示从队列中取出的项目数不等于放入队列的项目数。 我感觉这可能是由于我的测试未正确等待所有放入流中的项目所致 待处理。如果有人有任何建议,我会全力以赴!代码在下面。
package com.foo
import java.util.concurrent.atomic.AtomicInteger
import akka.stream.actorAttributes.supervisionStrategy
import akka.stream.{Attributes,Materializer,QueueOfferResult}
import akka.stream.Supervision.resumingDecider
import akka.stream.scaladsl.{Flow,Keep,Sink,Source}
import scala.concurrent.{ExecutionContext,Future}
import akka.NotUsed
import akka.actor.actorSystem
import akka.event.{Logging,LoggingAdapter}
import akka.stream.actorMaterializer
import akka.stream.scaladsl.{Sink,Source}
import org.scalatest.mockito.MockitoSugar
import org.scalatest.{FunSuite,Matchers}
import scala.collection.immutable
import scala.concurrent.duration._
import scala.concurrent.{Await,Future,_}
final case class Responso()
final case class Requesto()
object Handler {
val dbRequestCounter = new AtomicInteger(0)
}
class Handler(implicit ec: ExecutionContext,mat: Materializer) {
import Handler._
private val source =
Source.queue[(Requesto,String)](8,akka.stream.OverflowStrategy.backpressure)
private val sink =
Sink.queue[(Requesto,String)]().withAttributes(Attributes.inputBuffer(8,8))
private val (sourceQueue,sinkQueue) = source.toMat(sink)(Keep.both).run()
def placeonSourceQueue(ar: Requesto): Future[QueueOfferResult] = {
sourceQueue.offer((ar,"foo"))
}
def pullOffSinkQueue(qofr: QueueOfferResult): Future[Responso] = {
dbRequestCounter.incrementAndGet()
qofr match {
case QueueOfferResult.Enqueued =>
sinkQueue.pull().flatMap { mayberequestPair: Option[(Requesto,String)] =>
Future.successful(Responso())
}
case error =>
println("enqueuing error: " + error)
Future.failed(new RuntimeException("enqueuing error: " + error))
}
}
}
class ServiceImpl(readHandler: Handler,writeHandler: Handler)
(implicit log: LoggingAdapter,mat: Materializer) {
private val readAttributeFlow: Flow[Requesto,Responso,NotUsed] = {
Flow[Requesto]
.mapAsyncUnordered(1)(readHandler.placeonSourceQueue)
.mapAsyncUnordered(1)(readHandler.pullOffSinkQueue)
}
def getResponses(request: Source[Requesto,NotUsed]): Source[Responso,NotUsed] =
request
.via(readAttributeFlow)
.withAttributes(supervisionStrategy(resumingDecider))
}
class stressSpec
extends FunSuite
with MockitoSugar
with Matchers {
val streamedRequestsPerConnection = 10
val numSimultaneousConnections = 20
implicit val actorSystem: actorSystem = actorSystem()
implicit val materializer: actorMaterializer = actorMaterializer()
implicit val log: LoggingAdapter = Logging(actorSystem.eventStream,"test")
implicit val ec: ExecutionContext = actorSystem.dispatcher
import Handler._
lazy val requestHandler = new Handler()
lazy val svc: ServiceImpl =
new ServiceImpl(requestHandler,requestHandler)
test("can handle lots of simultaneous read requests") {
val totalExpected = streamedRequestsPerConnection * numSimultaneousConnections
def sendRequestAndAwaitResponse(): Unit = {
def getResponses(i: Integer) = {
val requestStream: Source[Requesto,NotUsed] =
Source(1 to streamedRequestsPerConnection)
.map { i =>
Requesto()
}
svc.getResponses(requestStream).runWith(Sink.seq)
}
val responses: immutable.Seq[Future[immutable.Seq[Responso]]] =
(1 to numSimultaneousConnections).map { getResponses(_) }
val flattenedResponses: Future[immutable.Seq[Responso]] =
Future.sequence(responses).map(_.flatten)
Await.ready(flattenedResponses,1000.seconds).value.get
}
sendRequestAndAwaitResponse()
dbRequestCounter.get shouldBe(totalExpected)
}
}