为什么使用switchIfEmpty时项目反应堆会无限期挂起?

背景

我正在使用Spring Boot 2.2.1,project-reactor 3.3.0和spring-data-mongodb 2.2.1,并且正在尝试从多个查询中加载数据。我的代码大致如下:

Flux.just("type1","type2","type3","type4")
    .concatMap { type ->
        reactiveMongoOperations.find<Map<String,Any>>(BasicQuery("{'type': '$type'}"),"collectionName")
                                .doOnError { e ->
                                    log.error("Caught exception when reading from mongodb: ${e::class.simpleName} - ${e.message}",e)
                                }.switchIfEmpty {
                                    log.warn("Failed to find any documents of type $type")
                                    Mono.empty<Map<String,Any>>()
                                }
    } 
    .. // More operations here
    .subscribe()

问题在于,如果reactiveMongoOperations.find(..)找不到给定类型的任何文档(因此记录了"Failed to find any documents of type $type"),则整个操作将无限期地挂起。如果删除switchIfEmpty子句,则操作完成,一切正常。

问题

  1. 如果添加switchIfEmpty操作,为什么整个操作都将挂起?我使用flatMap而不是concatMap都没关系,它最终还是会挂起。
  2. 我应该如何记录该特定查询未找到任何文档?即我要记录的是,reactiveMongoOperations.find(..)返回空的Flux时找不到文档。
jnnakj 回答:为什么使用switchIfEmpty时项目反应堆会无限期挂起?

当从Kotlin将代码重写为Java时(如Thomas在评论中所建议),我找到了答案!我以为我使用了reactor.kotlin.core.publisher.switchIfEmpty库提供的Kotlin reactor-kotlin-extensions扩展功能:

fun <T> Flux<T>.switchIfEmpty(s: () -> Publisher<T>): Flux<T> = this.switchIfEmpty(Flux.defer { s() })

情况并非如此,因此我最终使用了switchIfEmpty定义的Flux方法,如下所示:

public final Flux<T> switchIfEmpty(Publisher<? extends T> alternate)

要使其在没有扩展功能的情况下工作,我可能应该做这样的事情:

.. 
.switchIfEmpty { subscriber ->
    log.warn("Failed to find any documents of type $type")
    subscriber.onComplete()
}

我最初的解决方案不起作用,因为Java版本假定我创建一个Publisher(我这样做了),并且还调用为此功能发布商(我没有)。在Kotlin中,lambda参数是可选的,如果不需要的话,这就是为什么类型系统没有捕获到该参数的原因。

这是Kotlin与Java互操作的技巧之一。

本文链接:https://www.f2er.com/3056399.html

大家都在问