我有一个实用程序类来集中处理Reactor的Publisher
来处理Cassandra查询:
public Mono<ResultSet> execute(Statement statement) {
return Mono.defer(() -> Mono.fromFuture(FutureConverter
.toCompletableFuture(session.executeAsync(statement)))
.publishOn(Schedulers.elastic()));
}
它可以很好地工作,因为它不会处理反压,最终会导致Cassandra池耗尽,从而导致错误。 有没有一种方法可以将订阅的并发性(将它们放入FIFO队列中)限制为给定的数量(该数量与池的大小匹配)?
谢谢。