select——>用于选择更快的结果。
基于场景理解
比如客户端要查询一个商品的详情。两个服务:缓存服务,速度快但信息可能是旧的;网络服务,速度慢但信息一定是最新的。
如何实现上述逻辑:
- runBlocking {
- suspend fun getCacheInfo(productId: String): Product {
- delay(100L)
- return Product(productId, 8.9)
- }
-
- suspend fun getNetworkInfo(productId: String): Product? {
- delay(200L)
- return Product(productId, 8.8)
- }
-
- fun updateUI(product: Product) {
- println("${product.productId} : ${product.price}")
- }
-
- val startTime = System.currentTimeMillis()
- val productId = "001"
- val cacheInfo = getCacheInfo(productId)
- if (cacheInfo != null) {
- updateUI(cacheInfo)
- println("Time cost: ${System.currentTimeMillis() - startTime}")
- }
-
- val latestInfo = getNetworkInfo(productId)
- if (latestInfo != null) {
- updateUI(latestInfo)
- println("Time cost: ${System.currentTimeMillis() - startTime}")
- }
- }
-
-
- 001 : 8.9
- Time cost: 113
- 001 : 8.8
- Time cost: 324
上述程序分为四步:第一步:查询缓存信息;第二步:缓存服务返回信息,更新 UI;第三步:查询网络服务;第四步:网络服务返回信息,更新 UI。
用户可以第一时间看到商品的信息,虽然它暂时会展示旧的信息,但由于我们同时查询了网络服务,旧缓存信息也马上会被替代成新的信息。但是可能存在一些问题:如果程序卡在了缓存服务,获取网络服务就会无法执行。是因为 getCacheInfo() 它是一个挂起函数,只有这个程序执行成功以后,才可以继续执行后面的任务。能否做到:两个挂起函数同时执行,谁返回的速度更快,就选择哪个结果。答案是使用select。
- runBlocking {
-
- suspend fun getCacheInfo(productId: String): Product {
- delay(100L)
- return Product(productId, 8.9)
- }
-
- suspend fun getNetworkInfo(productId: String): Product {
- delay(200L)
- return Product(productId, 8.8)
- }
-
- fun updateUI(product: Product) {
- println("${product.productId} : ${product.price}")
- }
-
- val startTime = System.currentTimeMillis()
- val productId = "001"
-
- val product = select
{ -
- async {
- getCacheInfo(productId)
- }.onAwait {
- it
- }
-
- async {
- getNetworkInfo(productId)
- }.onAwait {
- it
- }
- }
-
- if (product != null) {
- updateUI(product)
- println("Time cost: ${System.currentTimeMillis() - startTime}")
- }
- }
-
-
- 001 : 8.9
- Time cost: 134
-
- Process finished with exit code 0
由于缓存的服务更快,所以,select 确实帮我们选择了更快的那个结果。我们的 select 可以在缓存服务出现问题的时候,灵活选择网络服务的结果。从而避免用户等待太长的时间,得到糟糕的体验。
在上述代码中,用户大概率是会展示旧的缓存信息。但实际场景下,我们是需要进一步更新最新信息的。
- runBlocking {
- suspend fun getCacheInfo(productId: String): Product {
- delay(100L)
- return Product(productId, 8.9)
- }
-
- suspend fun getNetworkInfo(productId: String): Product {
- delay(200L)
- return Product(productId, 8.8)
- }
-
- fun updateUI(product: Product) {
- println("${product.productId} : ${product.price}")
- }
-
- val startTime = System.currentTimeMillis()
- val productId = "001"
-
- val cacheDeferred = async {
- getCacheInfo(productId)
- }
- val latestDeferred = async {
- getNetworkInfo(productId)
- }
-
- val product = select
{ -
- cacheDeferred.onAwait {
- it.copy(isCache = true)
- }
-
- latestDeferred.onAwait {
- it.copy(isCache = false)
- }
- }
-
- if (product != null) {
- updateUI(product)
- println("Time cost: ${System.currentTimeMillis() - startTime}")
- }
-
- if (product != null && product.isCache) {
- val latest = latestDeferred.await() ?: return@runBlocking
- updateUI(latest)
- println("Time cost: ${System.currentTimeMillis() - startTime}")
- }
-
- }
-
-
- 001 : 8.9
- Time cost: 124
- 001 : 8.8
- Time cost: 228
-
- Process finished with exit code 0
如果是多个服务的缓存场景呢?
- runBlocking {
-
- val startTime = System.currentTimeMillis()
- val productId = "001"
-
-
- suspend fun getCacheInfo(productId: String): Product {
- delay(100L)
- return Product(productId, 8.9)
- }
-
- suspend fun getCacheInfo2(productId: String): Product {
- delay(50L)
- return Product(productId, 8.85)
- }
-
- suspend fun getNetworkInfo(productId: String): Product {
- delay(200L)
- return Product(productId, 8.8)
- }
-
- fun updateUI(product: Product) {
- println("${product.productId} : ${product.price}")
- }
-
- val cacheDeferred = async {
- getCacheInfo(productId)
- }
-
- val cacheDeferred2 = async {
- getCacheInfo2(productId)
- }
-
- val latestDeferred = async {
- getNetworkInfo(productId)
- }
-
- val product = select
{ - cacheDeferred.onAwait {
- it.copy(isCache = true)
- }
-
- cacheDeferred2.onAwait {
- it.copy(isCache = true)
- }
-
- latestDeferred.onAwait {
- it.copy(isCache = true)
- }
- }
- if (product != null) {
- updateUI(product)
- println("Time cost: ${System.currentTimeMillis() - startTime}")
- }
-
- if (product != null && product.isCache) {
- val latest = latestDeferred.await()
- updateUI(latest)
- println("Time cost: ${System.currentTimeMillis() - startTime}")
- }
-
- }
-
-
- Log
-
- 001 : 8.85
- Time cost: 79
- 001 : 8.8
- Time cost: 229
-
- Process finished with exit code 0
select 代码模式,可以提升程序的整体响应速度。
- runBlocking {
- val startTime = System.currentTimeMillis()
- val channel1 = produce {
- send(1)
- delay(200L)
- send(2)
- delay(200L)
- send(3)
- }
-
- val channel2 = produce {
- delay(100L)
- send("a")
- delay(200L)
- send("b")
- delay(200L)
- send("c")
- }
-
-
-
- channel1.consumeEach {
- println(it)
- }
- channel2.consumeEach {
- println(it)
- }
-
- println("Time cost: ${System.currentTimeMillis() - startTime}")
- }
-
-
- Log
-
- 1
- 2
- 3
- a
- b
- c
- Time cost: 853
-
- Process finished with exit code 0
上述代码串行执行,可以使用select进行优化。
- runBlocking {
- val startTime = System.currentTimeMillis()
- val channel1 = produce {
- send(1)
- delay(200L)
- send(2)
- delay(200L)
- send(3)
- }
-
- val channel2 = produce {
- delay(100L)
- send("a")
- delay(200L)
- send("b")
- delay(200L)
- send("c")
- }
-
- suspend fun selectChannel(
- channel1: ReceiveChannel<Int>,
- channel2: ReceiveChannel<String>
- ): Any {
- return select
{ - if (!channel1.isClosedForReceive) {
- channel1.onReceive {
- it.also {
- println(it)
- }
- }
- }
-
- if (!channel2.isClosedForReceive) {
- channel2.onReceive {
- it.also {
- println(it)
- }
- }
- }
- }
- }
-
- repeat(6) {
- selectChannel(channel1, channel2)
- }
- println("Time cost: ${System.currentTimeMillis() - startTime}")
- }
-
- Log
- 1
- a
- 2
- b
- 3
- c
- Time cost: 574
-
- Process finished with exit code 0
从代码执行结果可以发现程序的执行耗时有效减少。onReceive{} 是 Channel 在 select 当中的语法,当 Channel 当中有数据以后,它就会被回调,通过这个 Lambda,将结果传出去。 执行了 6 次 select,目的是要把两个管道中的所有数据都消耗掉。
如果Channel1不生产数据了,程序会如何执行?
- runBlocking {
- val startTime = System.currentTimeMillis()
- val channel1 = produce
{ - delay(5000L)
- }
-
- val channel2 = produce
{ - delay(100L)
- send("a")
- delay(200L)
- send("b")
- delay(200L)
- send("c")
- }
-
- suspend fun selectChannel(
- channel1: ReceiveChannel<String>,
- channel2: ReceiveChannel<String>
- ): String = select
{ - channel1.onReceive {
- it.also {
- println(it)
- }
- }
- channel2.onReceive {
- it.also {
- println(it)
- }
- }
- }
- repeat(3) {
- selectChannel(channel1, channel2)
- }
- println("Time cost: ${System.currentTimeMillis() - startTime}")
- }
-
- Log
- a
- b
- c
- Time cost: 570
-
- Process finished with exit code 0
如果不知道Channel的个数,如何避免ClosedReceiveChannelException?
使用:onReceiveCatching{}
- runBlocking {
- val startTime = System.currentTimeMillis()
- val channel1 = produce
{ - delay(5000L)
- }
-
- val channel2 = produce
{ - delay(100L)
- send("a")
- delay(200L)
- send("b")
- delay(200L)
- send("c")
- }
-
-
- suspend fun selectChannel(
- channel1: ReceiveChannel<String>,
- channel2: ReceiveChannel<String>
- ): String = select
{ - channel1.onReceiveCatching {
- it.getOrNull() ?: "channel1 is closed!"
- }
- channel2.onReceiveCatching {
- it.getOrNull() ?: "channel2 is closed!"
- }
- }
-
- repeat(6) {
- val result = selectChannel(channel1, channel2)
- println(result)
- }
-
- println("Time cost: ${System.currentTimeMillis() - startTime}")
- }
-
- Log
- a
- b
- c
- channel2 is closed!
- channel2 is closed!
- channel2 is closed!
- Time cost: 584
-
- Process finished with exit code 0
得到所有结果以后,程序不会立即退出,因为 channel1 一直在 delay()。
所以我们需要在6次repeat之后将channel关闭。
- runBlocking {
- val startTime = System.currentTimeMillis()
- val channel1 = produce
{ - delay(15000L)
- }
-
- val channel2 = produce
{ - delay(100L)
- send("a")
- delay(200L)
- send("b")
- delay(200L)
- send("c")
- }
-
-
- suspend fun selectChannel(
- channel1: ReceiveChannel<String>,
- channel2: ReceiveChannel<String>
- ): String = select
{ - channel1.onReceiveCatching {
- it.getOrNull() ?: "channel1 is closed!"
- }
- channel2.onReceiveCatching {
- it.getOrNull() ?: "channel2 is closed!"
- }
- }
-
- repeat(6) {
- val result = selectChannel(channel1, channel2)
- println(result)
- }
-
- channel1.cancel()
- channel2.cancel()
- println("Time cost: ${System.currentTimeMillis() - startTime}")
- }
-
- Log
- a
- b
- c
- channel2 is closed!
- channel2 is closed!
- channel2 is closed!
- Time cost: 612
-
- Process finished with exit code 0
Deferred、Channel 的 API:
-
- public interface Deferred : CoroutineContext.Element {
- public suspend fun join()
- public suspend fun await(): T
-
- public val onJoin: SelectClause0
- public val onAwait: SelectClause1
- }
-
- public interface SendChannel<in E>
- public suspend fun send(element: E)
-
-
- public val onSend: SelectClause2
> -
- }
-
- public interface ReceiveChannel<out E> {
- public suspend fun receive(): E
-
- public suspend fun receiveCatching(): ChannelResult
-
- public val onReceive: SelectClause1
- public val onReceiveCatching: SelectClause1
> - }
当 select 与 Deferred 结合使用的时候,当并行的 Deferred 比较多的时候,你往往需要在得到一个最快的结果以后,去取消其他的 Deferred。
通过 async 并发执行协程,也可以借助 select 得到最快的结果。
- runBlocking {
- suspend fun
fastest(vararg deferreds: Deferred<T>): T = select { - fun cancelAll() = deferreds.forEach {
- it.cancel()
- }
-
- for (deferred in deferreds) {
- deferred.onAwait {
- cancelAll()
- it
- }
- }
- }
-
- val deferred1 = async {
- delay(100L)
- println("done1")
- "result1"
- }
-
- val deferred2 = async {
- delay(200L)
- println("done2")
- "result2"
- }
-
-
- val deferred3 = async {
- delay(300L)
- println("done3")
- "result3"
- }
-
-
- val deferred4 = async {
- delay(400L)
- println("done4")
- "result4"
- }
-
- val deferred5 = async {
- delay(5000L)
- println("done5")
- "result5"
- }
-
- val fastest = fastest(deferred1, deferred2, deferred3, deferred4, deferred5)
- println(fastest)
- }
-
- Log
-
- done1
- result1
-
- Process finished with exit code 0