我正在使用Flux构建我的反应性管道。在管道中,我需要调用3个不同的外部系统REST API,它们的访问率非常严格。 如果我突破了每秒速率阈值,我将成倍地受到节制。每个系统都有自己的阈值。
我正在使用Spring WebClient进行REST API调用;在3个API中,其中2个是GET,1个是POST。
在我的反应堆管道中,WebClient被包装在flatMap中以执行API调用,如以下代码所示:
WebClient getapiCall1 = WebClient.builder().build().get("api-system-1").retrieve().bodyToMono(String.class) //actual return DTO is different from string
WebClient getapiCall2 = WebClient.builder().build().get("api-system-2").retrieve().bodyToMono(String.class) //actual return DTO is different from string
WebClient getapiCall3 = WebClient.builder().build().get("api-system-3").retrieve().bodyToMono(String.class) //actual return DTO is different from string
Flux.generator(generator) // Generator pushes the elements from source 1 at a time
// make call to 1st API Service
.flatMap(data -> getapiCall1)
.map(api1Response -> api1ResponseModified)
// make call to 2nd API Service
.flatMap(api1ResponseModified -> getapiCall2)
.map(api2Response -> api2ResponseModified)
// make call to 3rd API Service
.flatMap(api2ResponseModified -> getapiCall3)
.map(api3Response -> api3ResponseModified)
// rest of the pipeline operators
//end
.subscriber();
问题是,如果我没有将concurrency
值设置为flatMap,那么在服务启动后的几秒钟内,我的管道执行就会违反阈值。
如果将concurrency
的值设置为1、2、5、10,则吞吐量将变得非常低。
问题是,在不为并发设置任何值的情况下,如何实现应满足外部系统速率限制的背压?