我希望可以在列表中找到处理管道的输出,但是程序会立即退出。
您在那里的代码在主线程上建立了反应链,然后在主线程上执行了其他任何操作。因此,主线程完成了其工作,并且由于boundedElastic()
线程是守护程序线程,因此没有其他线程阻止程序退出,因此它退出了。
您可以通过一个更简单的示例看到相同的行为:
Flux<Integer> f = Flux.just(1,2,3,4,5)
.delayElements(Duration.ofMillis(500));
f.subscribe(System.out::println);
您当然可以调用newBoundedElastic("name",false)
使其成为非守护进程支持的调度程序,但是随后您必须对其进行跟踪并在完成后调用dispose,因此确实可以解决问题(程序将无限运行,直到您将调度程序销毁为止。)
快速的'n'肮脏解决方案只是阻塞Flux
的最后一个元素作为程序的最后一行-因此,如果我们添加:
f.blockLast();
...然后程序在退出之前等待最后一个元素被发出,并且我们具有所要遵循的行为。
对于概念的简单证明,这很好。但是,这在“生产”代码中并不理想。首先,“无阻塞”是响应式代码中的通用规则,因此,如果您有这样的阻塞调用,则很难确定是否是有意的。如果添加了其他链,并且还希望它们完成,则必须为每个链添加阻塞呼叫。那是一团糟,而且不是真正可持续的。
更好的解决方案是使用CountDownLatch
:
CountDownLatch cdl = new CountDownLatch(1);
Flux.just(1,5)
.delayElements(Duration.ofMillis(500))
.doFinally(s -> cdl.countDown())
.subscribe(System.out::println);
cdl.await();
这具有不显式阻止的优点,并且还能够一次处理多个发布者(如果您将初始值设置为大于1)。这也是我通常推荐的这种方法问题-因此,如果您想要最广泛接受的解决方案,那可能就是这样。
但是,对于所有需要等待多个发布者而不是一个发布者的示例,我倾向于使用Phaser
,它与CountdownLatch类似,但是可以动态register()
和{{ 1}}。这意味着您可以创建单个相位器,然后根据需要轻松地向其注册多个发布者,而无需更改初始值,例如:
deregister()
(当然,如果需要,您也可以将Phaser phaser = new Phaser(1);
Flux.just(1,5)
.doOnSubscribe(s -> phaser.register())
.delayElements(Duration.ofMillis(500))
.doFinally(s -> phaser.arriveAndDeregister())
.subscribe(System.out::println);
Flux.just(1,5,6,7,8)
.doOnSubscribe(s -> phaser.register())
.delayElements(Duration.ofMillis(500))
.doFinally(s -> phaser.arriveAndDeregister())
.subscribe(System.out::println);
phaser.arriveAndAwaitAdvance();
和onSubscribe
逻辑包装在单独的方法中。)
本文链接:https://www.f2er.com/3169941.html