阻止Spring WebFlux WebClient对新订阅执行新的交换

DefaultWebClient的{​​{1}}实现为:

exchange

如上所示,@Override public Mono<ClientResponse> exchange() { ClientRequest request = (this.inserter != null ? initRequestBuilder().body(this.inserter).build() : initRequestBuilder().build()); return Mono.defer(() -> exchangeFunction.exchange(request) .checkpoint("Request to " + this.httpMethod.name() + " " + this.uri + " [DefaultWebClient]") .switchIfEmpty(NO_HTTP_CLIENT_RESPONSE_ERROR)); } 调用由exchangeFunction.exchange包裹,因此只要有订阅返回的Mono.defer的调用都会执行。

但是,在我的特定用例中,鉴于以下简化代码,我不想重新执行交换:

Mono<ClientResponse>

正如您在我的用例中所见,我尝试过使用final WebClient webClient = WebClient.create("http://some-base-url"); final Atomicreference<Mono<ClientResponse>> responseRef = new Atomicreference<>(null); Flux.fromIterable(Arrays.asList(1,2,3)) .flatMap(num -> { if (...some condition...) { return responseRef.updateAndGet(response -> response == null ? webClient.get().uri("/some-path").exchange() : response) .flatMap(response -> {...do something with num and response...}); } else { return Mono.just(...something...); } }) ... 来延迟获取Atomicreference,这样就不会一次又一次地发出HTTP请求。

这不符合预期,因为订阅由Mono<ClientResponse>发布的flatMap的“用数字做响应” Mono<ClientResponse>会触发其内部{{1} }一遍又一遍。

我可以在发布的exchange()上包裹一些内容,以抵消exchangeFunction.exchange的影响吗?还是在不更改用例代码结构的情况下解决该问题?

==========可行的解决方案==========

受接受的答案启发,我将代码更改为如下工作:

Mono<ClientResponse>

Mono.defer之后注意final WebClient webClient = WebClient.create("http://some-base-url"); final Atomicreference<Mono<ClientResponse>> responseRef = new Atomicreference<>(null); Flux.fromIterable(Arrays.asList(1,3)) .flatMap(num -> { if (...some condition...) { return responseRef.updateAndGet(response -> response == null ? webClient.get().uri("/some-path").exchange().cache() : response) .flatMap(response -> {...do something with num and response...}); } else { return Mono.just(...something...); } }) ... cache()的缓存将其变为热源,并缓存最后发出的信号以供其他订户使用。完成和错误也将被重播。

jenny013 回答:阻止Spring WebFlux WebClient对新订阅执行新的交换

您可以执行以下操作:

final WebClient webClient = WebClient.create("http://localhost:8080");
Flux<String> data = webClient
                .get()
                .uri("test")
                .exchange()
                //do whatever you need on response
                .flatMap(clientResponse -> clientResponse.bodyToMono(String.class))
                .flux()
                //Turn this Flux into a hot source and cache last emitted signals for further Subscriber
                .replay()
                //Connects this ConnectableFlux to the upstream source when the first Subscriber subscribes.
                .autoConnect();

Flux.range(0,10).flatMap(integer -> data).log().subscribe();

您可以这样做:

 Mono<String> data = webClient
                .get()
                .uri("test")
                .exchange()
                .flatMap(clientResponse -> clientResponse.bodyToMono(String.class))
                .cache();


 Flux.range(0,10).flatMap(integer -> {
        if (integer % 2 == 0)
            return data;
        else
            return Mono.empty();
    }).log().subscribe();
本文链接:https://www.f2er.com/2580118.html

大家都在问