我正在创建一个Akka Http路由,该路由使用WebSocket将无限量的TextMessage流传输到客户端。我忽略了来自客户端的消息,并发回了消息流。
为简单起见,我已经创建了一个简化的代码版本。问题在于,在客户端发出初始请求后,客户端未收到任何消息。
// Using the Source.queue component to "inject" data into the Source
val queueSource: Source[TextMessage,SourceQueueWithComplete[String]] =
Source
.queue[String](1024,OverflowStrategy.backpressure)
.map(quoteStr => TextMessage(quoteStr.toString))
// This simulates messages received from an external system being
// feed into the Source
Future {
val queue = queueSource.preMaterialize()._1
val rng = Random
while(true) {
Thread.sleep(500)
queue.offer(rng.nextString(20))
}
}
val route =
path("api" / "stream") {
handleWebSocketMessages(
Flow.fromSinkAndSource(
Sink.ignore,queueSource
)
)
}
Http().bindAndHandle(route,"0.0.0.0",8081)
如果我将queueSource
替换为Source(1 to 10).throttle(2,1 second
,一切正常。