FluxSink实例及解析

前端之家收集整理的这篇文章主要介绍了FluxSink实例及解析前端之家小编觉得挺不错的,现在分享给大家,也给大家做个参考。

本文主要研究一下FluxSink的机制

FluxSink

reactor-core-3.1.3.RELEASE-sources.jar!/reactor/core/publisher/FluxSink.java

  1. /**
  2. * Wrapper API around a downstream Subscriber for emitting any number of
  3. * next signals followed by zero or one onError/onComplete.
  4. * <p>
  5. * @param <T> the value type
  6. */
  7. public interface FluxSink<T> {
  8.  
  9. /**
  10. * @see Subscriber#onComplete()
  11. */
  12. void complete();
  13.  
  14. /**
  15. * Return the current subscriber {@link Context}.
  16. * <p>
  17. * {@link Context} can be enriched via {@link Flux#subscriberContext(Function)}
  18. * operator or directly by a child subscriber overriding
  19. * {@link CoreSubscriber#currentContext()}
  20. *
  21. * @return the current subscriber {@link Context}.
  22. */
  23. Context currentContext();
  24.  
  25. /**
  26. * @see Subscriber#onError(Throwable)
  27. * @param e the exception to signal,not null
  28. */
  29. void error(Throwable e);
  30.  
  31. /**
  32. * Try emitting,might throw an unchecked exception.
  33. * @see Subscriber#onNext(Object)
  34. * @param t the value to emit,not null
  35. */
  36. FluxSink<T> next(T t);
  37.  
  38. /**
  39. * The current outstanding request amount.
  40. * @return the current outstanding request amount
  41. */
  42. long requestedFromDownstream();
  43.  
  44. /**
  45. * Returns true if the downstream cancelled the sequence.
  46. * @return true if the downstream cancelled the sequence
  47. */
  48. boolean isCancelled();
  49.  
  50. /**
  51. * Attaches a {@link LongConsumer} to this {@link FluxSink} that will be notified of
  52. * any request to this sink.
  53. * <p>
  54. * For push/pull sinks created using {@link Flux#create(java.util.function.Consumer)}
  55. * or {@link Flux#create(java.util.function.Consumer,FluxSink.OverflowStrategy)},* the consumer
  56. * is invoked for every request to enable a hybrid backpressure-enabled push/pull model.
  57. * When bridging with asynchronous listener-based APIs,the {@code onRequest} callback
  58. * may be used to request more data from source if required and to manage backpressure
  59. * by delivering data to sink only when requests are pending.
  60. * <p>
  61. * For push-only sinks created using {@link Flux#push(java.util.function.Consumer)}
  62. * or {@link Flux#push(java.util.function.Consumer,* the consumer is invoked with an initial request of {@code Long.MAX_VALUE} when this method
  63. * is invoked.
  64. *
  65. * @param consumer the consumer to invoke on each request
  66. * @return {@link FluxSink} with a consumer that is notified of requests
  67. */
  68. FluxSink<T> onRequest(LongConsumer consumer);
  69.  
  70. /**
  71. * Associates a disposable resource with this FluxSink
  72. * that will be disposed in case the downstream cancels the sequence
  73. * via {@link org.reactivestreams.Subscription#cancel()}.
  74. * @param d the disposable callback to use
  75. * @return the {@link FluxSink} with resource to be disposed on cancel signal
  76. */
  77. FluxSink<T> onCancel(Disposable d);
  78.  
  79. /**
  80. * Associates a disposable resource with this FluxSink
  81. * that will be disposed on the first terminate signal which may be
  82. * a cancel,complete or error signal.
  83. * @param d the disposable callback to use
  84. * @return the {@link FluxSink} with resource to be disposed on first terminate signal
  85. */
  86. FluxSink<T> onDispose(Disposable d);
  87.  
  88. /**
  89. * Enumeration for backpressure handling.
  90. */
  91. enum OverflowStrategy {
  92. /**
  93. * Completely ignore downstream backpressure requests.
  94. * <p>
  95. * This may yield {@link IllegalStateException} when queues get full downstream.
  96. */
  97. IGNORE,/**
  98. * Signal an {@link IllegalStateException} when the downstream can't keep up
  99. */
  100. ERROR,/**
  101. * Drop the incoming signal if the downstream is not ready to receive it.
  102. */
  103. DROP,/**
  104. * Downstream will get only the latest signals from upstream.
  105. */
  106. LATEST,/**
  107. * Buffer all signals if the downstream can't keep up.
  108. * <p>
  109. * Warning! This does unbounded buffering and may lead to {@link OutOfMemoryError}.
  110. */
  111. BUFFER
  112. }
  113. }
注意OverflowStrategy.BUFFER使用的是一个无界队列,需要额外注意OOM问题

实例

  1. public static void main(String[] args) throws InterruptedException {
  2. final Flux<Integer> flux = Flux.<Integer> create(fluxSink -> {
  3. //NOTE sink:class reactor.core.publisher.FluxCreate$SerializedSink
  4. LOGGER.info("sink:{}",fluxSink.getClass());
  5. while (true) {
  6. LOGGER.info("sink next");
  7. fluxSink.next(ThreadLocalRandom.current().nextInt());
  8. }
  9. },FluxSink.OverflowStrategy.BUFFER);
  10.  
  11. //NOTE flux:class reactor.core.publisher.FluxCreate,prefetch:-1
  12. LOGGER.info("flux:{},prefetch:{}",flux.getClass(),flux.getPrefetch());
  13.  
  14. flux.subscribe(e -> {
  15. LOGGER.info("subscribe:{}",e);
  16. try {
  17. TimeUnit.SECONDS.sleep(10);
  18. } catch (InterruptedException e1) {
  19. e1.printStackTrace();
  20. }
  21. });
  22.  
  23. TimeUnit.MINUTES.sleep(20);
  24. }
这里create创建的是reactor.core.publisher.FluxCreate,而其sink是reactor.core.publisher.FluxCreate$SerializedSink

Flux.subscribe

reactor-core-3.1.3.RELEASE-sources.jar!/reactor/core/publisher/Flux.java

  1. /**
  2. * Subscribe {@link Consumer} to this {@link Flux} that will respectively consume all the
  3. * elements in the sequence,handle errors,react to completion,and request upon subscription.
  4. * It will let the provided {@link Subscription subscriptionConsumer}
  5. * request the adequate amount of data,or request unbounded demand
  6. * {@code Long.MAX_VALUE} if no such consumer is provided.
  7. * <p>
  8. * For a passive version that observe and forward incoming data see {@link #doOnNext(java.util.function.Consumer)},* {@link #doOnError(java.util.function.Consumer)},{@link #doOnComplete(Runnable)}
  9. * and {@link #doOnSubscribe(Consumer)}.
  10. * <p>For a version that gives you more control over backpressure and the request,see
  11. * {@link #subscribe(Subscriber)} with a {@link BaseSubscriber}.
  12. * <p>
  13. * Keep in mind that since the sequence can be asynchronous,this will immediately
  14. * return control to the calling thread. This can give the impression the consumer is
  15. * not invoked when executing in a main thread or a unit test for instance.
  16. *
  17. * <p>
  18. * <img class="marble" src="https://raw.githubusercontent.com/reactor/reactor-core/v3.1.3.RELEASE/src/docs/marble/subscribecomplete.png" alt="">
  19. *
  20. * @param consumer the consumer to invoke on each value
  21. * @param errorConsumer the consumer to invoke on error signal
  22. * @param completeConsumer the consumer to invoke on complete signal
  23. * @param subscriptionConsumer the consumer to invoke on subscribe signal,to be used
  24. * for the initial {@link Subscription#request(long) request},or null for max request
  25. *
  26. * @return a new {@link Disposable} that can be used to cancel the underlying {@link Subscription}
  27. */
  28. public final Disposable subscribe(
  29. @Nullable Consumer<? super T> consumer,@Nullable Consumer<? super Throwable> errorConsumer,@Nullable Runnable completeConsumer,@Nullable Consumer<? super Subscription> subscriptionConsumer) {
  30. return subscribeWith(new LambdaSubscriber<>(consumer,errorConsumer,completeConsumer,subscriptionConsumer));
  31. }
  32.  
  33. @Override
  34. public final void subscribe(Subscriber<? super T> actual) {
  35. onLastAssembly(this).subscribe(Operators.toCoreSubscriber(actual));
  36. }
创建的是LambdaSubscriber,最后调用FluxCreate.subscribe

FluxCreate.subscribe

reactor-core-3.1.3.RELEASE-sources.jar!/reactor/core/publisher/FluxCreate.java

  1. public void subscribe(CoreSubscriber<? super T> actual) {
  2. BaseSink<T> sink = createSink(actual,backpressure);
  3.  
  4. actual.onSubscribe(sink);
  5. try {
  6. source.accept(
  7. createMode == CreateMode.PUSH_PULL ? new SerializedSink<>(sink) :
  8. sink);
  9. }
  10. catch (Throwable ex) {
  11. Exceptions.throwIfFatal(ex);
  12. sink.error(Operators.onOperatorError(ex,actual.currentContext()));
  13. }
  14. }
  15. static <T> BaseSink<T> createSink(CoreSubscriber<? super T> t,OverflowStrategy backpressure) {
  16. switch (backpressure) {
  17. case IGNORE: {
  18. return new IgnoreSink<>(t);
  19. }
  20. case ERROR: {
  21. return new ErrorAsyncSink<>(t);
  22. }
  23. case DROP: {
  24. return new DropAsyncSink<>(t);
  25. }
  26. case LATEST: {
  27. return new LatestAsyncSink<>(t);
  28. }
  29. default: {
  30. return new BufferAsyncSink<>(t,Queues.SMALL_BUFFER_SIZE);
  31. }
  32. }
  33. }
先创建sink,这里创建的是BufferAsyncSink,然后调用LambdaSubscriber.onSubscribe
然后再调用source.accept,也就是调用fluxSink的lambda方法产生数据,开启stream模式

LambdaSubscriber.onSubscribe

reactor-core-3.1.3.RELEASE-sources.jar!/reactor/core/publisher/LambdaSubscriber.java

  1. public final void onSubscribe(Subscription s) {
  2. if (Operators.validate(subscription,s)) {
  3. this.subscription = s;
  4. if (subscriptionConsumer != null) {
  5. try {
  6. subscriptionConsumer.accept(s);
  7. }
  8. catch (Throwable t) {
  9. Exceptions.throwIfFatal(t);
  10. s.cancel();
  11. onError(t);
  12. }
  13. }
  14. else {
  15. s.request(Long.MAX_VALUE);
  16. }
  17. }
  18. }
这里又调用了BufferAsyncSink的request(Long.MAX_VALUE),实际是调用BaseSink的request
  1. public final void request(long n) {
  2. if (Operators.validate(n)) {
  3. Operators.addCap(REQUESTED,this,n);
  4.  
  5. LongConsumer consumer = requestConsumer;
  6. if (n > 0 && consumer != null && !isCancelled()) {
  7. consumer.accept(n);
  8. }
  9. onRequestedFromDownstream();
  10. }
  11. }
这里的onRequestedFromDownstream调用了BufferAsyncSink的onRequestedFromDownstream
  1. @Override
  2. void onRequestedFromDownstream() {
  3. drain();
  4. }
调用的是BufferAsyncSink的drain

BufferAsyncSink.drain

  1. void drain() {
  2. if (WIP.getAndIncrement(this) != 0) {
  3. return;
  4. }
  5.  
  6. int missed = 1;
  7. final Subscriber<? super T> a = actual;
  8. final Queue<T> q = queue;
  9.  
  10. for (; ; ) {
  11. long r = requested;
  12. long e = 0L;
  13.  
  14. while (e != r) {
  15. if (isCancelled()) {
  16. q.clear();
  17. return;
  18. }
  19.  
  20. boolean d = done;
  21.  
  22. T o = q.poll();
  23.  
  24. boolean empty = o == null;
  25.  
  26. if (d && empty) {
  27. Throwable ex = error;
  28. if (ex != null) {
  29. super.error(ex);
  30. }
  31. else {
  32. super.complete();
  33. }
  34. return;
  35. }
  36.  
  37. if (empty) {
  38. break;
  39. }
  40.  
  41. a.onNext(o);
  42.  
  43. e++;
  44. }
  45.  
  46. if (e == r) {
  47. if (isCancelled()) {
  48. q.clear();
  49. return;
  50. }
  51.  
  52. boolean d = done;
  53.  
  54. boolean empty = q.isEmpty();
  55.  
  56. if (d && empty) {
  57. Throwable ex = error;
  58. if (ex != null) {
  59. super.error(ex);
  60. }
  61. else {
  62. super.complete();
  63. }
  64. return;
  65. }
  66. }
  67.  
  68. if (e != 0) {
  69. Operators.produced(REQUESTED,e);
  70. }
  71.  
  72. missed = WIP.addAndGet(this,-missed);
  73. if (missed == 0) {
  74. break;
  75. }
  76. }
  77. }
这里的queue是创建BufferAsyncSink指定的,默认是Queues.SMALL_BUFFER_SIZE( Math.max(16,Integer.parseInt(System.getProperty("reactor.bufferSize.small","256"))))
而这里的onNext则是同步调用LambdaSubscriber的consumer

FluxCreate.subscribe#source.accept

  1. source.accept(
  2. createMode == CreateMode.PUSH_PULL ? new SerializedSink<>(sink) :
  3. sink);
CreateMode.PUSH_PULL这里对sink包装为SerializedSink,然后调用Flux.create自定义的lambda consumer
  1. fluxSink -> {
  2. //NOTE sink:class reactor.core.publisher.FluxCreate$SerializedSink
  3. LOGGER.info("sink:{}",fluxSink.getClass());
  4. while (true) {
  5. LOGGER.info("sink next");
  6. fluxSink.next(ThreadLocalRandom.current().nextInt());
  7. }
  8. }
之后就开启数据推送

SerializedSink.next

reactor-core-3.1.3.RELEASE-sources.jar!/reactor/core/publisher/FluxCreate.java#SerializedSink.next

  1. public FluxSink<T> next(T t) {
  2. Objects.requireNonNull(t,"t is null in sink.next(t)");
  3. if (sink.isCancelled() || done) {
  4. Operators.onNextDropped(t,sink.currentContext());
  5. return this;
  6. }
  7. if (WIP.get(this) == 0 && WIP.compareAndSet(this,1)) {
  8. try {
  9. sink.next(t);
  10. }
  11. catch (Throwable ex) {
  12. Operators.onOperatorError(sink,ex,t,sink.currentContext());
  13. }
  14. if (WIP.decrementAndGet(this) == 0) {
  15. return this;
  16. }
  17. }
  18. else {
  19. Queue<T> q = queue;
  20. synchronized (this) {
  21. q.offer(t);
  22. }
  23. if (WIP.getAndIncrement(this) != 0) {
  24. return this;
  25. }
  26. }
  27. drainLoop();
  28. return this;
  29. }
这里调用BufferAsyncSink.next,然后drainLoop之后才返回

BufferAsyncSink.next

  1. public FluxSink<T> next(T t) {
  2. queue.offer(t);
  3. drain();
  4. return this;
  5. }
这里将数据放入queue中,然后调用drain取数据,同步调用LambdaSubscriber的onNext

reactor-core-3.1.3.RELEASE-sources.jar!/reactor/core/publisher/LambdaSubscriber.java

  1. @Override
  2. public final void onNext(T x) {
  3. try {
  4. if (consumer != null) {
  5. consumer.accept(x);
  6. }
  7. }
  8. catch (Throwable t) {
  9. Exceptions.throwIfFatal(t);
  10. this.subscription.cancel();
  11. onError(t);
  12. }
  13. }
即同步调用自定义的subscribe方法,实例中除了log还会sleep,这里是同步阻塞的
这里调用完之后,fluxSink这里的next方法返回,然后继续循环
  1. fluxSink -> {
  2. //NOTE sink:class reactor.core.publisher.FluxCreate$SerializedSink
  3. LOGGER.info("sink:{}",fluxSink.getClass());
  4. while (true) {
  5. LOGGER.info("sink next");
  6. fluxSink.next(ThreadLocalRandom.current().nextInt());
  7. }
  8. }

小结

fluxSink这里看是无限循环next产生数据,实则不用担心,如果subscribe与fluxSink都是同一个线程的话(本实例都是在main线程),它们是同步阻塞调用的。

subscribe的时候调用LambdaSubscriber.onSubscribe,request(N)请求数据,然后再调用source.accept,也就是调用fluxSink的lambda方法产生数据,开启stream模式

这里的fluxSink.next里头阻塞调用了subscribe的consumer,返回之后才继续循环。

至于BUFFER模式OOM的问题,可以思考下如何产生。

猜你在找的React相关文章