.then
(仅用于终端的源链)
使用.then
,以链接您的执行与进程,该进程仅发送终端信号。
另外,请注意,如果您需要对错误信号进行某些操作,则必须事先将.then
与onErrorResume
一起使用。
public class AsyncService {
private AsyncValidationTask validation;
private AsyncSaveTask save;
public Mono<Object> validateAndSave(Object o) {
return Mono.defer(() -> this.validation.execute(o))
.onErrorResume(t -> ...) // should be before then
.then(this.save.execute(o))
}
}
.defer
为了推迟单声道创建
为了仅在成功验证的情况下执行this.save.execute(o)
,您还必须将其包装在Mono.defer
中:
public class AsyncService {
private AsyncValidationTask validation;
private AsyncSaveTask save;
public Mono<Object> validateAndSave(Object o) {
return Mono.defer(() -> this.validation.execute(o))
.onErrorResume(t -> ...) // should be before then
.then(Mono.defer(() -> this.save.execute(o)))
}
}
通常没有必要 ,因为Mono
是 LAZY 类型,其中 订阅 (订阅== {.subscribe()
)的情况下,strong> SHOULD 开始才开始工作。
由Mono#then
返回的对Mono
的订阅this.save.execute
保证的实现开始,方法 之后正确 Mono.defer(() -> this.validation.execute(o))
完成。
执行可能较早开始的唯一原因可能是目的(例如,故意提供这种行为的业务逻辑-缓存,热源等)。 OR this.save.execute(o)
的 INCORRECT 实现,无论实际订阅如何,它都开始工作。
正确设计实现
通常,最好确保能正常工作的API并将其公开为Publisher
(例如Mono
| Flux
)是惰性的。
这意味着API创建者必须确保仅在用户订阅了给定的Publisher
实例的情况下,才执行工作。
例如,如果您的异步API在下面进行了CompletableFuture
创建,则值得将CompletableFuture
创建的内容手动包装到Mono.defer
或使用适当的方法扩展名,例如Mono.fromFuture(Supplier<? extends CompletableFuture<? extends T>> futureSupplier)
执行官示例
让我们考虑如何使常规ThreadPool任务提交处于活动状态。
interface Executor {
Future<T> execute(Callable<T> runnable);
}
因此,为了使Executor
具有反应性,我们必须创建如下内容:
interface ReactiveExecutor {
Mono<T> execute(Callable<T> runnable);
}
实施不正确
以下是可行的这种适配器的可能实现:
class ReactiveExecutorAdapter {
final Executor delegate;
...
Mono<T> execute(Callable<T> runnable) {
MonoProcessor<T> result = MonoProcessor.create();
Future<T> task = delegate.execute(() -> {
T value = runnable.call();
result.onNext(value);
result.onComplet();
return value;
});
return result.doOnCancel(() -> task.cancel());
}
}
当然,这样的实现将起作用。但是,它存在一些关键问题:
- 执行从方法调用开始(这与反应流
Publisher
的惰性行为有些矛盾)
- 由于执行是在实际任务订阅之前开始的,因此我们必须创建一个有状态的
Mono
,它支持以后的订阅。
- 此实现无法处理根本没有订阅者的情况(例如,执行已经开始,但是没有发生
.subscribe
方法(然后我们就发生了值泄漏,无法处理)
- 一般来说,它太笨拙,无法解决。另外,为了防止前面提到的所有情况,有必要在实现之外用
Mono execute(..)
包装Mono.defer
上的每个调用(请参阅问题中的原始问题)。随后,这导致一个事实,即API用户可以轻松地“自发自足”,而无需使用额外的.defer
来包装执行
那么,如何解决呢?
基本上,将Mono.defer
移动到库内部就足够了。这将使API用户的生活变得更加轻松,因为他们不必考虑何时需要使用延迟(因此,减少了可能的问题)。
例如,我们的反应式执行器最简单的解决方案可以是:
class ReactiveExecutorAdapter {
final Executor delegate;
...
Mono<T> execute(Callable<T> runnable) {
Mono.defer(() -> {
MonoProcessor<T> result = MonoProcessor.create();
Future<T> task = delegate.execute(() -> {
T value = runnable.call();
result.onNext(value);
result.onComplet();
return value;
});
return result.doOnCancel(() -> task.cancel());
})
}
}
仅通过推迟执行,就可以确定解决至少一个问题-确保不再泄漏值。
但是,如何正确解决呢?
但是,为了解决特定情况下的所有可能问题,我们可以使用Mono.create
,它是为采用异步API设计的:
class ReactiveExecutorAdapter {
final Executor delegate;
...
Mono<T> execute(Callable<T> runnable) {
Mono.crete(monoSink -> {
Future<T> task = delegate.execute(() -> {
T value = runnable.call();
monoSink.complete(value);
});
monoSink.doOnCancel(task::cancel);
})
}
}
使用Mono.create
可以保证每个订阅者都可以延迟执行。
另外,使用MonoSink
API,我们可以快速挂接到来自订户的所有基本信号。
最后,Mono.create确保在任何情况下,该值都将被适当丢弃。
最后,有了这样的API,就不必在所有情况下都使用defer
,
不知道我是否正确理解了问题,但是。.看OP问题中方法的签名,它们看起来并不像ExecutorService
的“异步任务”,它们看起来像仅仅是{{ 1}}生产方法,这在“反应性”世界中经常会遇到。那么问题就是将它们与
之类的东西组合在一起
Mono
Mono<Object> validateAndSave(Object o) {
return validation.execute(o)
.then(save.execute(o));
将忽略源(即.then
)发出的元素,但不会忽略错误信号。因此,对于validation.execute
,您的方法将返回onError
,否则将返回Mono.error()
返回的内容。
本文链接:https://www.f2er.com/2588124.html