带有JPA 2.2 resultStream的Spring数据到Kotlin的流程

在JPA 2.2之前,如果我想向Kotlin的ScrollableResults发出Flow,我必须这样做:

  override fun findSomeUsers(batch: Int): Flow<User> {
    return flow {
      (em.delegate as Session).sessionFactory.openSession().use { session ->
        val query = session.createQuery("select u from User u where ...")
        query.fetchSize = batch
        query.isReadOnly = true

        query.scroll(ScrollMode.FORWARD_ONLY).use { results ->
          while (results.next()) {
            val u = results.get(0) as User
            emit(u)
          }
        }
      }
    }
  }

我必须将EntityManager下放到Hibernate的Session

但是,由于JPA 2.2的Query支持getResultStream,因此应该有一种更清洁的方式实现这一目标:

  @ExperimentalCoroutinesApi
  override fun findSomeUsers(batchSize: Int): Flow<User> {
    return channelFlow {
      em.createQuery("select u from User u where ...")
        .setHint(HINT_FETCH_SIZE,batchSize) // "org.hibernate.fetchSize"
        .unwrap(javax.persistence.Query::class.java)
        .resultStream
        .asSequence()
        .map { it as User }
        .forEach { u ->
          runBlocking {
            send(u)
          }
        }
    }
  }

嗯,它运作良好,但是有点腥。

首先,为什么我不能只编码resultStream.asSequence.map {it as User}.asFlow()? (客户端一无所获就结束了)

第二,runBlocking块很丑。 runBlocking仅应在测试中使用。但是我发现没有办法在代码中规避它。

有什么办法解决吗?

第三,这与问题无关。看来Spring-Data-JPA仍然不支持这种查询方法:

  @Query("select u from User u where ...") 
  @MaybeSomeQueryHint(batchSize=:batchSize)
  fun findSomeUsers(@Param("name="batchSize") batchSize: Int): Flow<User>

它将加载所有用户,然后抱怨重复的行...

客户端(测试)端代码是如此简单:

  @ExperimentalCoroutinesApi
  @Test
  @Transactional
  open fun testUsers() {
    runBlocking {
      userDao.findSomeUsers(100).collectIndexed { index,u: User ->
        logger.info("[{}] {}",index,u)
      }
    }
  }

对于@Marko来说,Stream版本的效果很好:

  override fun findSomeUserStream(batchSize: Int): Stream<User> {
    return em.createQuery("select u from User u where ...")
      .setHint(HINT_FETCH_SIZE,batchSize) // "org.hibernate.fetchSize"
      .unwrap(javax.persistence.Query::class.java)
      .resultStream
      .map { it as User }
  }


  @Transactional // without this annotation,"Operation not allowed after ResultSet closed" will be thrown
  @Test
  open fun testUserStream() {
    runBlocking {
      userDao.findSomeUserStream(100).forEach { u ->
        logger.info("{}",u)
      }
    }
  }


  // it works !!
  @Transactional
  @Test
  open fun testUserStream2() {
    runBlocking {
      userDao.findSomeUserStream(100).asSequence().asFlow().collect { u ->
        logger.info("{}",u)
      }
    }
  }
wish4star 回答:带有JPA 2.2 resultStream的Spring数据到Kotlin的流程

定义补丁从Stream.toSequence()Stream的转换,而不是修补Flow的结果:

fun <T> Stream<T>.asFlow() = flow {
    for (t in iterator()) {
        emit(t)
    }
}

如果将此代码示例与之配合使用:

suspend fun main() {
    Stream.of("a","b")
            .asFlow()
            .collect { println(it) }
}

它将打印

a
b

您的函数应如下所示:

override fun findSomeUsers(batchSize: Int): Flow<User> {
    return em.createQuery("select u from User u where ...")
            .setHint(HINT_FETCH_SIZE,batchSize) // "org.hibernate.fetchSize"
            .unwrap(javax.persistence.Query::class.java)
            .resultStream
            .asFlow()
            .map { it as User }
}
本文链接:https://www.f2er.com/3164081.html

大家都在问