Kotlin实现多线程请求对冲?

Spring的反应堆具有一个有趣的功能:对冲。这意味着产生许多请求并获得第一个返回的结果,并自动清除其他上下文。 Josh Long最近一直在积极推广此功能。谷歌搜索Spring reactor hedging显示相对结果。如果有人好奇,示例代码为here。简而言之,Flux.first()简化了所有底层的麻烦,这令人印象深刻。

我想知道如何通过Kotlin的协程和多线程(甚至可以使用FlowChannel)来实现。我想到了一个简单的方案:一个服务接受longUrl并生成longUrl到许多URL缩短服务(例如IsGd,TinyUrl ...),并返回第一个返回的URL ...(并终止/清除其他线程/协程资源)

有一个定义此工作的接口UrlShorter

interface UrlShorter {
  fun getShortUrl(longUrl: String): String?
}

有3种实现,一种用于is.gd,另一种用于tinyUrl,第三种是Dumb实现,它会阻塞10秒并返回null:

class IsgdImpl : UrlShorter {
  override fun getShortUrl(longUrl: String): String? {
    logger.info("running : {}",Thread.currentThread().name)
    // isGd api url blocked by SO,it sucks . see the underlaying gist for full code
    val url = "https://is.gd/_create.php?format=simple&url=%s".format(URLEncoder.encode(longUrl,"UTF-8"))
    return Request.Get(url).execute().returnContent().asString().also {
      logger.info("returning {}",it)
    }
  }
}

class TinyImpl : UrlShorter {
  override fun getShortUrl(longUrl: String): String? {
    logger.info("running : {}",Thread.currentThread().name)
    val url = "http://tinyurl.com/_api-create.php?url=$longUrl" // sorry the URL is blocked by stackoverflow,see the underlaying gist for full code
    return Request.Get(url).execute().returnContent().asString().also {
      logger.info("returning {}",it)
    }
  }
}

class DumbImpl : UrlShorter {
  override fun getShortUrl(longUrl: String): String? {
    logger.info("running : {}",Thread.currentThread().name)
    TimeUnit.SECONDS.sleep(10)
    return null
  }
}

有一个UrlShorterService接受所有UrlShorter的实现,并尝试生成协程并获得第一个结果。

这就是我想到的:

@ExperimentalCoroutinesApi
@FlowPreview
class UrlShorterService(private val impls: List<UrlShorter>) {


  private val es: ExecutorService = Executors.newFixedThreadPool(impls.size)
  private val esDispatcher = es.asCoroutineDispatcher()

  suspend fun getShortUrl(longUrl: String): String {
    return method1(longUrl) // there are other methods,with different ways...
  }

  private inline fun <T,R : Any> Iterable<T>.firstNotNullResult(transform: (T) -> R?): R? {
    for (element in this) {
      val result = transform(element)
      if (result != null) return result
    }
    return null
  }

客户端也很简单:

@ExperimentalCoroutinesApi
@FlowPreview
class UrlShorterServiceTest {

  @Test
  fun testHedging() {
    val impls = listOf(DumbImpl(),IsgdImpl(),TinyImpl()) // Dumb first
    val service = UrlShorterService(impls)
    runBlocking {
      service.getShortUrl("https://www.google.com").also {
        logger.info("result = {}",it)
      }
    }
  }
}

请注意,我将DumbImpl放在第一位,因为我希望它可以首先生成并阻塞其线程。其他两个实现都可以得到结果。

好,这是问题所在,如何在Kotlin中实现套期保值?我尝试以下方法:

  private suspend fun method1(longUrl: String): String {
    return impls.asSequence().asFlow().flatMapMerge(impls.size) { impl ->
      flow {
        impl.getShortUrl(longUrl)?.also {
          emit(it)
        }
      }.flowOn(esDispatcher)
    }.first()
      .also { esDispatcher.cancelChildren() } // doesn't impact the result
  }

我希望method1应该起作用,但是它完全可以执行10秒:

00:56:09,253 INFO  TinyImpl - running : pool-1-thread-3
00:56:09,254 INFO  DumbImpl - running : pool-1-thread-1
00:56:09,253 INFO  IsgdImpl - running : pool-1-thread-2
00:56:11,150 INFO  TinyImpl - returning // tiny url blocked by SO,it sucks
00:56:13,604 INFO  IsgdImpl - returning // idGd url blocked by SO,it sucks
00:56:19,261 INFO  UrlShorterServiceTest$testHedging$1 - result = // tiny url blocked by SO,it sucks

然后,我以为其他method2,method3,method4,method5 ...都不能用:

  /**
   * 00:54:29,035 INFO  IsgdImpl - running : pool-1-thread-3
   * 00:54:29,036 INFO  DumbImpl - running : pool-1-thread-2
   * 00:54:29,035 INFO  TinyImpl - running : pool-1-thread-1
   * 00:54:30,228 INFO  TinyImpl - returning // tiny url blocked by SO,it sucks
   * 00:54:30,797 INFO  IsgdImpl - returning // idGd url blocked by SO,it sucks
   * 00:54:39,046 INFO  UrlShorterServiceTest$testHedging$1 - result = // idGd url blocked by SO,it sucks
   */
  private suspend fun method2(longUrl: String): String {
    return withContext(esDispatcher) {
      impls.map { impl ->
        async(esDispatcher) {
          impl.getShortUrl(longUrl)
        }
      }.firstNotNullResult { it.await() } ?: longUrl
    }
  }
  /**
   * 00:52:30,681 INFO  IsgdImpl - running : pool-1-thread-2
   * 00:52:30,682 INFO  DumbImpl - running : pool-1-thread-1
   * 00:52:30,681 INFO  TinyImpl - running : pool-1-thread-3
   * 00:52:31,838 INFO  TinyImpl - returning // tiny url blocked by SO,it sucks
   * 00:52:33,721 INFO  IsgdImpl - returning // idGd url blocked by SO,it sucks
   * 00:52:40,691 INFO  UrlShorterServiceTest$testHedging$1 - result = // idGd url blocked by SO,it sucks
   */
  private suspend fun method3(longUrl: String): String {
    return coroutinescope {
      impls.map { impl ->
        async(esDispatcher) {
          impl.getShortUrl(longUrl)
        }
      }.firstNotNullResult { it.await() } ?: longUrl
    }
  }
  /**
   * 01:58:56,930 INFO  TinyImpl - running : pool-1-thread-1
   * 01:58:56,933 INFO  DumbImpl - running : pool-1-thread-2
   * 01:58:56,930 INFO  IsgdImpl - running : pool-1-thread-3
   * 01:58:58,411 INFO  TinyImpl - returning // tiny url blocked by SO,it sucks
   * 01:58:59,026 INFO  IsgdImpl - returning // idGd url blocked by SO,it sucks
   * 01:59:06,942 INFO  UrlShorterServiceTest$testHedging$1 - result =  // idGd url blocked by SO,it sucks
   */
  private suspend fun method4(longUrl: String): String {
    return withContext(esDispatcher) {
      impls.map { impl ->
        async {
          impl.getShortUrl(longUrl)
        }
      }.firstNotNullResult { it.await() } ?: longUrl
    }
  }

我对Channel不熟悉,为例外↓

  /**
   * 01:29:44,460 INFO  UrlShorterService$method5$2 - channel closed
   * 01:29:44,461 INFO  DumbImpl - running : pool-1-thread-2
   * 01:29:44,460 INFO  IsgdImpl - running : pool-1-thread-3
   * 01:29:44,466 INFO  TinyImpl - running : pool-1-thread-1
   * 01:29:45,765 INFO  TinyImpl - returning // tiny url blocked by SO,it sucks
   * 01:29:46,339 INFO  IsgdImpl - returning // idGd url blocked by SO,it sucks
   *
   * kotlinx.coroutines.channels.ClosedSendChannelException: Channel was closed
   *
   */
  private suspend fun method5(longUrl: String): String {
    val channel = Channel<String>()

    withContext(esDispatcher) {
      impls.forEach { impl ->
        launch {
          impl.getShortUrl(longUrl)?.also {
            channel.send(it)
          }
        }
      }
      channel.close()
      logger.info("channel closed")
    }

    return channel.consumeAsFlow().first()
  }

好的,我不知道是否还有其他方法...但是以上所有方法均无效...所有阻止至少10秒(被DumbImpl阻止)。

整个源代码可以在github gist上找到。

如何在Kotlin中实现套期保值?通过DeferredFlowChannel还是其他更好的主意?谢谢。

提交问题后,我发现所有tinyurl,isGd url被SO阻止。真的很烂!

steve313 回答:Kotlin实现多线程请求对冲?

如果要并行执行的实际工作包括网络获取,则应选择一个异步网络库,以便可以与其一起正确使用非阻塞协程。例如,从版本11开始,JDK提供了一个异步HTTP客户端,您可以按以下方式使用它:

val httpClient: HttpClient = HttpClient.newHttpClient()

suspend fun httpGet(url: String): String = httpClient
        .sendAsync(
                HttpRequest.newBuilder().uri(URI.create(url)).build(),BodyHandlers.ofString())
        .await()
        .body()

在上述的可暂停实现中,这是一个完成请求对冲的功能:

class UrlShortenerService(
        private val impls: List<UrlShortener>
) {
    suspend fun getShortUrl(longUrl: String): String? = impls
            .asFlow()
            .flatMapMerge(impls.size) { impl ->
                flow<String?> {
                    try {
                        impl.getShortUrl(longUrl)?.also { emit(it) }
                    }
                    catch (e: Exception) { 
                        // maybe log it,but don't let it propagate
                    }
                }
            }
            .onCompletion { emit(null) }
            .first()
}

请注意,没有任何自定义调度程序,您不需要它们即可进行可挂起的工作。任何调度程序都会这样做,所有工作都可以在单个线程中运行。

当您所有的URL缩短程序都失败时,onCompletion部分将开始执行。在这种情况下,flatMapMerge阶段什么也不会发出,并且first()会在没有向流中注入额外的null的情况下死锁。

为了测试它,我使用了以下代码:

class Shortener(
        private val delay: Long
) : UrlShortener {
    override suspend fun getShortUrl(longUrl: String): String? {
        delay(delay * 1000)
        println("Shortener $delay completing")
        if (delay == 1L) {
            throw Exception("failed service")
        }
        if (delay == 2L) {
            return null
        }
        return "shortened after $delay seconds"
    }
}

suspend fun main() {
    val shorteners = listOf(
            Shortener(4),Shortener(3),Shortener(2),Shortener(1)
    )
    measureTimeMillis {
        UrlShortenerService(shorteners).getShortUrl("bla").also {
            println(it)
        }
    }.also {
        println("Took $it ms")
    }
}

这会执行各种失败情况,例如返回null或出现异常失败。对于此代码,我得到以下输出:

Shortener 1 completing
Shortener 2 completing
Shortener 3 completing
shortened after 3 seconds
Took 3080 ms

我们可以看到,起酥油1和2已完成但有故障,起酥油3返回了有效的响应,起酥油4已取消,但未完成。我认为这符合要求。


如果您无法摆脱阻止请求,则您的实现将不得不启动num_impls * num_concurrent_requests线程,这不是很好。但是,如果这是您所能拥有的最好的解决方案,那么这是一个对冲阻止请求,但可以挂起或取消等待的请求的实现。它将向运行请求的工作线程发送一个中断信号,但是如果您的库的IO代码是不可中断的,则这些线程将挂起,等待其请求完成或超时。

val es = Executors.newCachedThreadPool()

interface UrlShortener {
    fun getShortUrl(longUrl: String): String? // not suspendable!
}

class UrlShortenerService(
        private val impls: List<UrlShortener>
) {
    suspend fun getShortUrl(longUrl: String): String {
        val chan = Channel<String?>()
        val futures = impls.map { impl -> es.submit {
            try {
                impl.getShortUrl(longUrl)
            } catch (e: Exception) {
                null
            }.also { runBlocking { chan.send(it) } }
        } }
        try {
            (1..impls.size).forEach { _ ->
                chan.receive()?.also { return it }
            }
            throw Exception("All services failed")
        } finally {
            chan.close()
            futures.forEach { it.cancel(true) }
        }
    }
}
,

这实际上是select APi的设计宗旨:

coroutineScope {
    select {
        impls.forEach { impl ->
            async {
               impl.getShortUrl(longUrl)
            }.onAwait { it }
        }
    }
    coroutineContext[Job].cancelChildren() // Cancel any requests that are still going.
}

请注意,这将无法处理服务实现抛出的异常,如果要实际处理这些异常,则需要使用带有自定义异常处理程序和筛选选择循环的supervisorScope

本文链接:https://www.f2er.com/3149770.html

大家都在问