Java反应器-链Mono <Void>与另一个产生Mono <Object>的异步任务 实施不正确那么,如何解决呢?但是,如何正确解决呢?

我有以下异步任务:

select 
    count(*) filter(where cm.email  is null) in_cms_but_not_in_cm,-- 369
    count(*) filter(where cms.email is null) in_cm_but_not_in_cms   -- 9
from cm
full join cms using(email)
where cm.email is null or cms.email is null
public class AsyncValidationTask {
    // Returns Mono.error(new Exception()) if error,otherwise Mono.empty()
    public Mono<Void> execute(Object o);
}

及以下服务类别:

public class AsyncSavetask {
    // Returns Mono.error(new Exception()) if error,otherwise Mono of Object
    public Mono<Object> execute(Object o);
}

如上所示,如果public class AsyncService { private AsyncValidationTask validation; private AsyncSavetask save; public Mono<Object> validateAndSave(Object o) { return Mono.defer(() -> this.validation.execute(o)) // Right now,the problem is that when validation completes successfully,it // emits Mono.empty hence the flatMap chained below will not be invoked at all. .flatMap(dontcare -> this.save.execute(o)) } } 成功完成,我尝试使用flatMap链接AsyncSavetask.execute调用,它将无法工作,因为在此之后没有发出任何东西(Mono.empty)完成。

我还考虑AsyncValidationTask.execute链接第二个调用,但是无论第一个验证调用产生的Mono.error为何,它将始终调用链接的调用。

如何正确链接它们?

yxj366 回答:Java反应器-链Mono <Void>与另一个产生Mono <Object>的异步任务 实施不正确那么,如何解决呢?但是,如何正确解决呢?

.then(仅用于终端的源链)

使用.then,以链接您的执行与进程,该进程仅发送终端信号。

另外,请注意,如果您需要对错误信号进行某些操作,则必须事先将.thenonErrorResume一起使用。

public class AsyncService {

    private AsyncValidationTask validation;

    private AsyncSaveTask save;

    public Mono<Object> validateAndSave(Object o) {
        return Mono.defer(() -> this.validation.execute(o))
                   .onErrorResume(t -> ...) // should be before then
                   .then(this.save.execute(o))
    }
}

.defer为了推迟单声道创建

为了仅在成功验证的情况下执行this.save.execute(o),您还必须将其包装在Mono.defer中:

public class AsyncService {

    private AsyncValidationTask validation;

    private AsyncSaveTask save;

    public Mono<Object> validateAndSave(Object o) {
        return Mono.defer(() -> this.validation.execute(o))
                   .onErrorResume(t -> ...) // should be before then
                   .then(Mono.defer(() -> this.save.execute(o)))
    }
}

通常没有必要 ,因为Mono LAZY 类型,其中 订阅 (订阅== {.subscribe())的情况下,strong> SHOULD 开始才开始工作。

Mono#then返回的对Mono的订阅this.save.execute 保证的实现开始,方法 之后正确 Mono.defer(() -> this.validation.execute(o)) 完成

执行可能较早开始的唯一原因可能是目的(例如,故意提供这种行为的业务逻辑-缓存,热源等)。 OR this.save.execute(o) INCORRECT 实现,无论实际订阅如何,它都开始工作。

正确设计实现

通常,最好确保能正常工作的API并将其公开为Publisher(例如Mono | Flux)是惰性的。

这意味着API创建者必须确保仅在用户订阅了给定的Publisher实例的情况下,才执行工作。

例如,如果您的异步API在下面进行了CompletableFuture创建,则值得将CompletableFuture创建的内容手动包装到Mono.defer或使用适当的方法扩展名,例如Mono.fromFuture(Supplier<? extends CompletableFuture<? extends T>> futureSupplier)

执行官示例

让我们考虑如何使常规ThreadPool任务提交处于活动状态。

interface Executor  {
  Future<T> execute(Callable<T> runnable); 
}

因此,为了使Executor具有反应性,我们必须创建如下内容:

interface ReactiveExecutor {
   Mono<T> execute(Callable<T> runnable);
}

实施不正确

以下是可行的这种适配器的可能实现:

class ReactiveExecutorAdapter {
   final Executor delegate;

   ...


   Mono<T> execute(Callable<T> runnable) {
      MonoProcessor<T> result = MonoProcessor.create();
      Future<T> task = delegate.execute(() -> {
          T value = runnable.call();
          result.onNext(value);
          result.onComplet();
          return value;
      });

      return result.doOnCancel(() -> task.cancel());
   }
}

当然,这样的实现将起作用。但是,它存在一些关键问题:

  1. 执行从方法调用开始(这与反应流Publisher的惰性行为有些矛盾)
  2. 由于执行是在实际任务订阅之前开始的,因此我们必须创建一个有状态的Mono,它支持以后的订阅。
  3. 此实现无法处理根本没有订阅者的情况(例如,执行已经开始,但是没有发生.subscribe方法(然后我们就发生了值泄漏,无法处理)
  4. 一般来说,它太笨拙,无法解决。另外,为了防止前面提到的所有情况,有必要在实现之外用Mono execute(..)包装Mono.defer上的每个调用(请参阅问题中的原始问题)。随后,这导致一个事实,即API用户可以轻松地“自发自足”,而无需使用额外的.defer
  5. 来包装执行

那么,如何解决呢?

基本上,将Mono.defer移动到库内部就足够了。这将使API用户的生活变得更加轻松,因为他们不必考虑何时需要使用延迟(因此,减少了可能的问题)。

例如,我们的反应式执行器最简单的解决方案可以是:

class ReactiveExecutorAdapter {
   final Executor delegate;

   ...


   Mono<T> execute(Callable<T> runnable) {
      Mono.defer(() -> {
          MonoProcessor<T> result = MonoProcessor.create();
          Future<T> task = delegate.execute(() -> {
              T value = runnable.call();
              result.onNext(value);
              result.onComplet();
              return value;
          });

          return result.doOnCancel(() -> task.cancel());
     })
   }
}

仅通过推迟执行,就可以确定解决至少一个问题-确保不再泄漏值。

但是,如何正确解决呢?

但是,为了解决特定情况下的所有可能问题,我们可以使用Mono.create,它是为采用异步API设计的:

class ReactiveExecutorAdapter {
   final Executor delegate;

   ...


   Mono<T> execute(Callable<T> runnable) {
      Mono.crete(monoSink -> {

          Future<T> task = delegate.execute(() -> {
              T value = runnable.call();
              monoSink.complete(value);
          });

          monoSink.doOnCancel(task::cancel);
     })
   }
}

使用Mono.create可以保证每个订阅者都可以延迟执行。 另外,使用MonoSink API,我们可以快速挂接到来自订户的所有基本信号。 最后,Mono.create确保在任何情况下,该值都将被适当丢弃。

最后,有了这样的API,就不必在所有情况下都使用defer

,

不知道我是否正确理解了问题,但是。.看OP问题中方法的签名,它们看起来并不像ExecutorService的“异步任务”,它们看起来像仅仅是{{ 1}}生产方法,这在“反应性”世界中经常会遇到。那么问题就是将它们与

之类的东西组合在一起
Mono

Mono<Object> validateAndSave(Object o) { return validation.execute(o) .then(save.execute(o)); 将忽略源(即.then)发出的元素,但不会忽略错误信号。因此,对于validation.execute,您的方法将返回onError,否则将返回Mono.error()返回的内容。

本文链接:https://www.f2er.com/2588124.html

大家都在问