通过在光滑3中批量插入每5秒插入数千条记录,我得到了
org.postgresql.util.PsqlException: FATAL: sorry,too many clients already
我的数据访问层看起来像:
val db: CustomPostgresDriver.backend.DatabaseDef = Database.forURL(url,user=user,password=password,driver= jdbcDriver) override def insertBatch(rowList: List[T#TableElementType]): Future[Long] = { val res = db.run(insertBatchQuery(rowList)).map(_.head.toLong).recover{ case ex:Throwable=> RelationalRepositoryUtility.handleBatchOperationErrors(ex)} //db.close() res } override def insertBatchQuery(rowList: List[T#TableElementType]): FixedsqlAction[Option[Int],NoStream,Write] = { query ++= (rowList) }
val temp1 = list1.flatMap { li => Future.sequence(li.map { trip => val data = for { tripData <- TripDataRepository.insertQuery( trip.tripData) subTripData <- SubTripDataRepository.insertBatchQuery(getUpdatedSubTripDataList(trip.subTripData,tripData.id)) } yield ((tripData,subTripData)) val res=db.run(data.transactionally) res //db.close() }) }
如果我在我的工作后关闭连接,你可以在评论代码中看到我得到错误:
java.util.concurrent.RejectedExecutionException: Task slick.backend.DatabaseComponent$DatabaseDef$$anon$2@6c3ae2b6 rejected from java.util.concurrent.ThreadPoolExecutor@79d2d4eb[Terminated,pool size = 0,active threads = 0,queued tasks = 0,completed tasks = 1]
val temp1 =list.map { trip => val data = for { tripData <- TripDataRepository.insertQuery( trip.tripData) subTripData <- SubTripDataRepository.insertBatchQuery(getUpdatedSubTripDataList(trip.subTripData,tripData.id)) } yield ((tripData,subTripData)) val res=db.run(data.transactionally) res }
我仍然有太多的客户错误…
解决方法
这个问题的根源在于,您正在同时启动一个无限的Future列表,每个列表都连接到数据库 – 列表中每个条目一个.
这可以通过串行运行插入来解决,强制每个插入批处理依赖于前一个:
// Empty Future for the results. Replace Unit with the correct type - whatever // "res" is below. val emptyFuture = Future.successful(Seq.empty[Unit]) // This will only insert one at a time. You could use list.sliding to batch the // inserts if that was important. val temp1 = list.foldLeft(emptyFuture) { (prevIoUsFuture,trip) => prevIoUsFuture flatMap { prevIoUs => // Inner code copied from your example. val data = for { tripData <- TripDataRepository.insertQuery(trip.tripData) subTripData <- SubTripDataRepository.insertBatchQuery(getUpdatedSubTripDataList(trip.subTripData,tripData.id)) } yield ((tripData,subTripData)) val res = db.run(data.transactionally) prevIoUs :+ res } }