Spring的反应堆具有一个有趣的功能:对冲。这意味着产生许多请求并获得第一个返回的结果,并自动清除其他上下文。 Josh Long最近一直在积极推广此功能。谷歌搜索Spring reactor hedging显示相对结果。如果有人好奇,示例代码为here。简而言之,Flux.first()
简化了所有底层的麻烦,这令人印象深刻。
我想知道如何通过Kotlin的协程和多线程(甚至可以使用Flow
或Channel
)来实现。我想到了一个简单的方案:一个服务接受longUrl并生成longUrl到许多URL缩短服务(例如IsGd,TinyUrl ...),并返回第一个返回的URL ...(并终止/清除其他线程/协程资源)
有一个定义此工作的接口UrlShorter
:
interface UrlShorter {
fun getShortUrl(longUrl: String): String?
}
有3种实现,一种用于is.gd,另一种用于tinyUrl,第三种是Dumb实现,它会阻塞10秒并返回null:
class IsgdImpl : UrlShorter {
override fun getShortUrl(longUrl: String): String? {
logger.info("running : {}",Thread.currentThread().name)
// isGd api url blocked by SO,it sucks . see the underlaying gist for full code
val url = "https://is.gd/_create.php?format=simple&url=%s".format(URLEncoder.encode(longUrl,"UTF-8"))
return Request.Get(url).execute().returnContent().asString().also {
logger.info("returning {}",it)
}
}
}
class TinyImpl : UrlShorter {
override fun getShortUrl(longUrl: String): String? {
logger.info("running : {}",Thread.currentThread().name)
val url = "http://tinyurl.com/_api-create.php?url=$longUrl" // sorry the URL is blocked by stackoverflow,see the underlaying gist for full code
return Request.Get(url).execute().returnContent().asString().also {
logger.info("returning {}",it)
}
}
}
class DumbImpl : UrlShorter {
override fun getShortUrl(longUrl: String): String? {
logger.info("running : {}",Thread.currentThread().name)
TimeUnit.SECONDS.sleep(10)
return null
}
}
有一个UrlShorterService
接受所有UrlShorter
的实现,并尝试生成协程并获得第一个结果。
这就是我想到的:
@ExperimentalCoroutinesApi
@FlowPreview
class UrlShorterService(private val impls: List<UrlShorter>) {
private val es: ExecutorService = Executors.newFixedThreadPool(impls.size)
private val esDispatcher = es.asCoroutineDispatcher()
suspend fun getShortUrl(longUrl: String): String {
return method1(longUrl) // there are other methods,with different ways...
}
private inline fun <T,R : Any> Iterable<T>.firstNotNullResult(transform: (T) -> R?): R? {
for (element in this) {
val result = transform(element)
if (result != null) return result
}
return null
}
客户端也很简单:
@ExperimentalCoroutinesApi
@FlowPreview
class UrlShorterServiceTest {
@Test
fun testHedging() {
val impls = listOf(DumbImpl(),IsgdImpl(),TinyImpl()) // Dumb first
val service = UrlShorterService(impls)
runBlocking {
service.getShortUrl("https://www.google.com").also {
logger.info("result = {}",it)
}
}
}
}
请注意,我将DumbImpl
放在第一位,因为我希望它可以首先生成并阻塞其线程。其他两个实现都可以得到结果。
好,这是问题所在,如何在Kotlin中实现套期保值?我尝试以下方法:
private suspend fun method1(longUrl: String): String {
return impls.asSequence().asFlow().flatMapMerge(impls.size) { impl ->
flow {
impl.getShortUrl(longUrl)?.also {
emit(it)
}
}.flowOn(esDispatcher)
}.first()
.also { esDispatcher.cancelChildren() } // doesn't impact the result
}
我希望method1
应该起作用,但是它完全可以执行10秒:
00:56:09,253 INFO TinyImpl - running : pool-1-thread-3
00:56:09,254 INFO DumbImpl - running : pool-1-thread-1
00:56:09,253 INFO IsgdImpl - running : pool-1-thread-2
00:56:11,150 INFO TinyImpl - returning // tiny url blocked by SO,it sucks
00:56:13,604 INFO IsgdImpl - returning // idGd url blocked by SO,it sucks
00:56:19,261 INFO UrlShorterServiceTest$testHedging$1 - result = // tiny url blocked by SO,it sucks
然后,我以为其他method2,method3,method4,method5 ...都不能用:
/**
* 00:54:29,035 INFO IsgdImpl - running : pool-1-thread-3
* 00:54:29,036 INFO DumbImpl - running : pool-1-thread-2
* 00:54:29,035 INFO TinyImpl - running : pool-1-thread-1
* 00:54:30,228 INFO TinyImpl - returning // tiny url blocked by SO,it sucks
* 00:54:30,797 INFO IsgdImpl - returning // idGd url blocked by SO,it sucks
* 00:54:39,046 INFO UrlShorterServiceTest$testHedging$1 - result = // idGd url blocked by SO,it sucks
*/
private suspend fun method2(longUrl: String): String {
return withContext(esDispatcher) {
impls.map { impl ->
async(esDispatcher) {
impl.getShortUrl(longUrl)
}
}.firstNotNullResult { it.await() } ?: longUrl
}
}
/**
* 00:52:30,681 INFO IsgdImpl - running : pool-1-thread-2
* 00:52:30,682 INFO DumbImpl - running : pool-1-thread-1
* 00:52:30,681 INFO TinyImpl - running : pool-1-thread-3
* 00:52:31,838 INFO TinyImpl - returning // tiny url blocked by SO,it sucks
* 00:52:33,721 INFO IsgdImpl - returning // idGd url blocked by SO,it sucks
* 00:52:40,691 INFO UrlShorterServiceTest$testHedging$1 - result = // idGd url blocked by SO,it sucks
*/
private suspend fun method3(longUrl: String): String {
return coroutinescope {
impls.map { impl ->
async(esDispatcher) {
impl.getShortUrl(longUrl)
}
}.firstNotNullResult { it.await() } ?: longUrl
}
}
/**
* 01:58:56,930 INFO TinyImpl - running : pool-1-thread-1
* 01:58:56,933 INFO DumbImpl - running : pool-1-thread-2
* 01:58:56,930 INFO IsgdImpl - running : pool-1-thread-3
* 01:58:58,411 INFO TinyImpl - returning // tiny url blocked by SO,it sucks
* 01:58:59,026 INFO IsgdImpl - returning // idGd url blocked by SO,it sucks
* 01:59:06,942 INFO UrlShorterServiceTest$testHedging$1 - result = // idGd url blocked by SO,it sucks
*/
private suspend fun method4(longUrl: String): String {
return withContext(esDispatcher) {
impls.map { impl ->
async {
impl.getShortUrl(longUrl)
}
}.firstNotNullResult { it.await() } ?: longUrl
}
}
我对Channel
不熟悉,为例外↓
/**
* 01:29:44,460 INFO UrlShorterService$method5$2 - channel closed
* 01:29:44,461 INFO DumbImpl - running : pool-1-thread-2
* 01:29:44,460 INFO IsgdImpl - running : pool-1-thread-3
* 01:29:44,466 INFO TinyImpl - running : pool-1-thread-1
* 01:29:45,765 INFO TinyImpl - returning // tiny url blocked by SO,it sucks
* 01:29:46,339 INFO IsgdImpl - returning // idGd url blocked by SO,it sucks
*
* kotlinx.coroutines.channels.ClosedSendChannelException: Channel was closed
*
*/
private suspend fun method5(longUrl: String): String {
val channel = Channel<String>()
withContext(esDispatcher) {
impls.forEach { impl ->
launch {
impl.getShortUrl(longUrl)?.also {
channel.send(it)
}
}
}
channel.close()
logger.info("channel closed")
}
return channel.consumeAsFlow().first()
}
好的,我不知道是否还有其他方法...但是以上所有方法均无效...所有阻止至少10秒(被DumbImpl
阻止)。
整个源代码可以在github gist上找到。
如何在Kotlin中实现套期保值?通过Deferred
或Flow
或Channel
还是其他更好的主意?谢谢。
提交问题后,我发现所有tinyurl,isGd url被SO阻止。真的很烂!