Vertx WebClient响应正文中的反应流发布者

我正在尝试为Vertx web-client写一个包装器,以使用来自反应流的Publisher从服务器加载响应主体:

import org.reactivestreams.Publisher;
import io.vertx.reactivex.ext.web.client.WebClient;

interface Storage {
  Publisher<ByteBuffer> load(String key);
}

class WebStorage implements Storage {
  private final WebClient client;

  public WebStorage(final WebClient client) {
    this.client = client;
  }

  @Override
  public Publisher<ByteBuffer> laod(final String key) {
    return client.get(String.format("https://myhost/path?query=%s",key))
      .rxSend()
      .toFlowable()
      .map(resp -> ByteBuffer.wrap(resp.body().getBytes()));
  }
}

此解决方案是不正确的,因为它通过getBytes()调用以阻塞方式读取所有主体字节。

是否可以按块读取Vertx WebClient的响应并将其转换为Publisher(或Rx Flowable)?

yeqishy 回答:Vertx WebClient响应正文中的反应流发布者

Vert.x Web客户端并非旨在流式传输响应主体。它会按设计缓冲内容。

如果要流式传输内容,则可以使用更灵活的基础HTTP客户端。

,

我想您可以使用ByteCodec.pipe

import io.reactivex.Flowable;
import io.vertx.ext.reactivestreams.ReactiveWriteStream;
import io.vertx.reactivex.core.Vertx;
import io.vertx.reactivex.core.buffer.Buffer;
import io.vertx.reactivex.core.streams.WriteStream;
import io.vertx.reactivex.ext.web.client.WebClient;
import io.vertx.reactivex.ext.web.codec.BodyCodec;
import org.reactivestreams.Publisher;

import java.nio.ByteBuffer;

interface Storage {
    Publisher<ByteBuffer> load(String key);
}

class WebStorage implements Storage {
    private final Vertx vertx = Vertx.vertx();
    private final WebClient client;

    public WebStorage(final WebClient client) {
        this.client = client;
    }

    @Override
    public Publisher<ByteBuffer> load(final String key) {
        final ReactiveWriteStream<Buffer> stream = ReactiveWriteStream.writeStream(vertx.getDelegate());
        client.get(String.format("https://myhost/path?query=%s",key))
            .as(BodyCodec.pipe(WriteStream.newInstance(stream)))
            .rxSend().subscribe();
        return Flowable.fromPublisher(stream).map(buffer -> ByteBuffer.wrap(buffer.getBytes()));
    }
}
本文链接:https://www.f2er.com/2576680.html

大家都在问