Reactor Flux flatMap操作员吞吐量/并发控制并实现背压

我正在使用Flux构建我的反应性管道。在管道中,我需要调用3个不同的外部系统REST API,它们的访问率非常严格。 如果我突破了每秒速率阈值,我将成倍地受到节制。每个系统都有自己的阈值。

我正在使用Spring WebClient进行REST API调用;在3个API中,其中2个是GET,1个是POST。

在我的反应堆管道中,WebClient被包装在flatMap中以执行API调用,如以下代码所示:

WebClient getapiCall1 = WebClient.builder().build().get("api-system-1").retrieve().bodyToMono(String.class) //actual return DTO is different from string
WebClient getapiCall2 = WebClient.builder().build().get("api-system-2").retrieve().bodyToMono(String.class) //actual return DTO is different from string
WebClient getapiCall3 = WebClient.builder().build().get("api-system-3").retrieve().bodyToMono(String.class) //actual return DTO is different from string

    Flux.generator(generator) // Generator pushes the elements from source 1 at a time

    // make call to 1st API Service
    .flatMap(data -> getapiCall1)
    .map(api1Response -> api1ResponseModified)

    // make call to 2nd API Service
    .flatMap(api1ResponseModified -> getapiCall2)
    .map(api2Response -> api2ResponseModified)

// make call to 3rd API Service
.flatMap(api2ResponseModified -> getapiCall3)
.map(api3Response -> api3ResponseModified)

// rest of the pipeline operators

//end
.subscriber();

问题是,如果我没有将concurrency值设置为flatMap,那么在服务启动后的几秒钟内,我的管道执行就会违反阈值。 如果将concurrency的值设置为1、2、5、10,则吞吐量将变得非常低。

问题是,在不为并发设置任何值的情况下,如何实现应满足外部系统速率限制的背压?

iCMS 回答:Reactor Flux flatMap操作员吞吐量/并发控制并实现背压

鉴于您有“每秒速率”的要求,我将明确地对通量进行窗口化,并将每个窗口限制为选定的时间段。这将为您提供最大的吞吐量,而不会受到限制。

我将使用类似于以下的辅助功能

public static <T> Flux<T> limitIntervalRate(Flux<T> flux,int ratePerInterval,Duration interval) {
    return flux
            .window(ratePerInterval)
            .zipWith(Flux.interval(Duration.ZERO,interval))
            .flatMap(Tuple2::getT1);
}

您可以这样做:

sourceFlux
        .transform(f -> limitIntervalRate(f,2,Duration.ofSeconds(1))) //Limit to a rate of 2 per second

然后您可以根据需要将其映射到您的WebClient调用中,同时遵守每个API的限制:

sourceFlux
        //...assume API 1 has a limit of 10 calls per second
        .transform(f -> limitIntervalRate(f,10,Duration.ofSeconds(1)))
        .flatMap(data -> getApiCall1)
        .map(api1Response -> api1ResponseModified)

        //...assume API 2 has a limit of 20 calls per second
        .transform(f -> limitIntervalRate(f,20,Duration.ofSeconds(1))) 
        .flatMap(api1ResponseModified -> getApiCall2)
        .map(api2Response -> api2ResponseModified)

...等等。

,

Resilience4j支持Reactor的速率限制。 参见:

https://resilience4j.readme.io/docs/ratelimiter

https://resilience4j.readme.io/docs/examples-1#section-decorate-mono-or-flux-with-a-ratelimiter

本文链接:https://www.f2er.com/2220917.html

大家都在问