• Kotlin协程-select基础


    一、select是什么?

    select——>用于选择更快的结果。

    基于场景理解

    比如客户端要查询一个商品的详情。两个服务:缓存服务,速度快但信息可能是旧的;网络服务,速度慢但信息一定是最新的。

    如何实现上述逻辑:

    1. runBlocking {
    2. suspend fun getCacheInfo(productId: String): Product {
    3. delay(100L)
    4. return Product(productId, 8.9)
    5. }
    6. suspend fun getNetworkInfo(productId: String): Product? {
    7. delay(200L)
    8. return Product(productId, 8.8)
    9. }
    10. fun updateUI(product: Product) {
    11. println("${product.productId} : ${product.price}")
    12. }
    13. val startTime = System.currentTimeMillis()
    14. val productId = "001"
    15. val cacheInfo = getCacheInfo(productId)
    16. if (cacheInfo != null) {
    17. updateUI(cacheInfo)
    18. println("Time cost: ${System.currentTimeMillis() - startTime}")
    19. }
    20. val latestInfo = getNetworkInfo(productId)
    21. if (latestInfo != null) {
    22. updateUI(latestInfo)
    23. println("Time cost: ${System.currentTimeMillis() - startTime}")
    24. }
    25. }
    26. 001 : 8.9
    27. Time cost: 113
    28. 001 : 8.8
    29. Time cost: 324

    上述程序分为四步:第一步:查询缓存信息;第二步:缓存服务返回信息,更新 UI;第三步:查询网络服务;第四步:网络服务返回信息,更新 UI。

    用户可以第一时间看到商品的信息,虽然它暂时会展示旧的信息,但由于我们同时查询了网络服务,旧缓存信息也马上会被替代成新的信息。但是可能存在一些问题:如果程序卡在了缓存服务,获取网络服务就会无法执行。是因为 getCacheInfo() 它是一个挂起函数,只有这个程序执行成功以后,才可以继续执行后面的任务。能否做到:两个挂起函数同时执行,谁返回的速度更快,就选择哪个结果。答案是使用select。

    1. runBlocking {
    2. suspend fun getCacheInfo(productId: String): Product {
    3. delay(100L)
    4. return Product(productId, 8.9)
    5. }
    6. suspend fun getNetworkInfo(productId: String): Product {
    7. delay(200L)
    8. return Product(productId, 8.8)
    9. }
    10. fun updateUI(product: Product) {
    11. println("${product.productId} : ${product.price}")
    12. }
    13. val startTime = System.currentTimeMillis()
    14. val productId = "001"
    15. val product = select {
    16. async {
    17. getCacheInfo(productId)
    18. }.onAwait {
    19. it
    20. }
    21. async {
    22. getNetworkInfo(productId)
    23. }.onAwait {
    24. it
    25. }
    26. }
    27. if (product != null) {
    28. updateUI(product)
    29. println("Time cost: ${System.currentTimeMillis() - startTime}")
    30. }
    31. }
    32. 001 : 8.9
    33. Time cost: 134
    34. Process finished with exit code 0

    由于缓存的服务更快,所以,select 确实帮我们选择了更快的那个结果。我们的 select 可以在缓存服务出现问题的时候,灵活选择网络服务的结果。从而避免用户等待太长的时间,得到糟糕的体验。

    在上述代码中,用户大概率是会展示旧的缓存信息。但实际场景下,我们是需要进一步更新最新信息的。

    1. runBlocking {
    2. suspend fun getCacheInfo(productId: String): Product {
    3. delay(100L)
    4. return Product(productId, 8.9)
    5. }
    6. suspend fun getNetworkInfo(productId: String): Product {
    7. delay(200L)
    8. return Product(productId, 8.8)
    9. }
    10. fun updateUI(product: Product) {
    11. println("${product.productId} : ${product.price}")
    12. }
    13. val startTime = System.currentTimeMillis()
    14. val productId = "001"
    15. val cacheDeferred = async {
    16. getCacheInfo(productId)
    17. }
    18. val latestDeferred = async {
    19. getNetworkInfo(productId)
    20. }
    21. val product = select {
    22. cacheDeferred.onAwait {
    23. it.copy(isCache = true)
    24. }
    25. latestDeferred.onAwait {
    26. it.copy(isCache = false)
    27. }
    28. }
    29. if (product != null) {
    30. updateUI(product)
    31. println("Time cost: ${System.currentTimeMillis() - startTime}")
    32. }
    33. if (product != null && product.isCache) {
    34. val latest = latestDeferred.await() ?: return@runBlocking
    35. updateUI(latest)
    36. println("Time cost: ${System.currentTimeMillis() - startTime}")
    37. }
    38. }
    39. 001 : 8.9
    40. Time cost: 124
    41. 001 : 8.8
    42. Time cost: 228
    43. Process finished with exit code 0

    如果是多个服务的缓存场景呢?

    1. runBlocking {
    2. val startTime = System.currentTimeMillis()
    3. val productId = "001"
    4. suspend fun getCacheInfo(productId: String): Product {
    5. delay(100L)
    6. return Product(productId, 8.9)
    7. }
    8. suspend fun getCacheInfo2(productId: String): Product {
    9. delay(50L)
    10. return Product(productId, 8.85)
    11. }
    12. suspend fun getNetworkInfo(productId: String): Product {
    13. delay(200L)
    14. return Product(productId, 8.8)
    15. }
    16. fun updateUI(product: Product) {
    17. println("${product.productId} : ${product.price}")
    18. }
    19. val cacheDeferred = async {
    20. getCacheInfo(productId)
    21. }
    22. val cacheDeferred2 = async {
    23. getCacheInfo2(productId)
    24. }
    25. val latestDeferred = async {
    26. getNetworkInfo(productId)
    27. }
    28. val product = select {
    29. cacheDeferred.onAwait {
    30. it.copy(isCache = true)
    31. }
    32. cacheDeferred2.onAwait {
    33. it.copy(isCache = true)
    34. }
    35. latestDeferred.onAwait {
    36. it.copy(isCache = true)
    37. }
    38. }
    39. if (product != null) {
    40. updateUI(product)
    41. println("Time cost: ${System.currentTimeMillis() - startTime}")
    42. }
    43. if (product != null && product.isCache) {
    44. val latest = latestDeferred.await()
    45. updateUI(latest)
    46. println("Time cost: ${System.currentTimeMillis() - startTime}")
    47. }
    48. }
    49. Log
    50. 001 : 8.85
    51. Time cost: 79
    52. 001 : 8.8
    53. Time cost: 229
    54. Process finished with exit code 0

    select 代码模式,可以提升程序的整体响应速度。

    二、select 和 Channel

    1. runBlocking {
    2. val startTime = System.currentTimeMillis()
    3. val channel1 = produce {
    4. send(1)
    5. delay(200L)
    6. send(2)
    7. delay(200L)
    8. send(3)
    9. }
    10. val channel2 = produce {
    11. delay(100L)
    12. send("a")
    13. delay(200L)
    14. send("b")
    15. delay(200L)
    16. send("c")
    17. }
    18. channel1.consumeEach {
    19. println(it)
    20. }
    21. channel2.consumeEach {
    22. println(it)
    23. }
    24. println("Time cost: ${System.currentTimeMillis() - startTime}")
    25. }
    26. Log
    27. 1
    28. 2
    29. 3
    30. a
    31. b
    32. c
    33. Time cost: 853
    34. Process finished with exit code 0

    上述代码串行执行,可以使用select进行优化。

    1. runBlocking {
    2. val startTime = System.currentTimeMillis()
    3. val channel1 = produce {
    4. send(1)
    5. delay(200L)
    6. send(2)
    7. delay(200L)
    8. send(3)
    9. }
    10. val channel2 = produce {
    11. delay(100L)
    12. send("a")
    13. delay(200L)
    14. send("b")
    15. delay(200L)
    16. send("c")
    17. }
    18. suspend fun selectChannel(
    19. channel1: ReceiveChannel<Int>,
    20. channel2: ReceiveChannel<String>
    21. ): Any {
    22. return select {
    23. if (!channel1.isClosedForReceive) {
    24. channel1.onReceive {
    25. it.also {
    26. println(it)
    27. }
    28. }
    29. }
    30. if (!channel2.isClosedForReceive) {
    31. channel2.onReceive {
    32. it.also {
    33. println(it)
    34. }
    35. }
    36. }
    37. }
    38. }
    39. repeat(6) {
    40. selectChannel(channel1, channel2)
    41. }
    42. println("Time cost: ${System.currentTimeMillis() - startTime}")
    43. }
    44. Log
    45. 1
    46. a
    47. 2
    48. b
    49. 3
    50. c
    51. Time cost: 574
    52. Process finished with exit code 0

    从代码执行结果可以发现程序的执行耗时有效减少。onReceive{} 是 Channel 在 select 当中的语法,当 Channel 当中有数据以后,它就会被回调,通过这个 Lambda,将结果传出去。 执行了 6 次 select,目的是要把两个管道中的所有数据都消耗掉。

    如果Channel1不生产数据了,程序会如何执行?

    1. runBlocking {
    2. val startTime = System.currentTimeMillis()
    3. val channel1 = produce {
    4. delay(5000L)
    5. }
    6. val channel2 = produce {
    7. delay(100L)
    8. send("a")
    9. delay(200L)
    10. send("b")
    11. delay(200L)
    12. send("c")
    13. }
    14. suspend fun selectChannel(
    15. channel1: ReceiveChannel<String>,
    16. channel2: ReceiveChannel<String>
    17. ): String = select {
    18. channel1.onReceive {
    19. it.also {
    20. println(it)
    21. }
    22. }
    23. channel2.onReceive {
    24. it.also {
    25. println(it)
    26. }
    27. }
    28. }
    29. repeat(3) {
    30. selectChannel(channel1, channel2)
    31. }
    32. println("Time cost: ${System.currentTimeMillis() - startTime}")
    33. }
    34. Log
    35. a
    36. b
    37. c
    38. Time cost: 570
    39. Process finished with exit code 0

     

    如果不知道Channel的个数,如何避免ClosedReceiveChannelException?

    使用:onReceiveCatching{} 

    1. runBlocking {
    2. val startTime = System.currentTimeMillis()
    3. val channel1 = produce {
    4. delay(5000L)
    5. }
    6. val channel2 = produce {
    7. delay(100L)
    8. send("a")
    9. delay(200L)
    10. send("b")
    11. delay(200L)
    12. send("c")
    13. }
    14. suspend fun selectChannel(
    15. channel1: ReceiveChannel<String>,
    16. channel2: ReceiveChannel<String>
    17. ): String = select {
    18. channel1.onReceiveCatching {
    19. it.getOrNull() ?: "channel1 is closed!"
    20. }
    21. channel2.onReceiveCatching {
    22. it.getOrNull() ?: "channel2 is closed!"
    23. }
    24. }
    25. repeat(6) {
    26. val result = selectChannel(channel1, channel2)
    27. println(result)
    28. }
    29. println("Time cost: ${System.currentTimeMillis() - startTime}")
    30. }
    31. Log
    32. a
    33. b
    34. c
    35. channel2 is closed!
    36. channel2 is closed!
    37. channel2 is closed!
    38. Time cost: 584
    39. Process finished with exit code 0

    得到所有结果以后,程序不会立即退出,因为 channel1 一直在 delay()。

    所以我们需要在6次repeat之后将channel关闭。

    1. runBlocking {
    2. val startTime = System.currentTimeMillis()
    3. val channel1 = produce {
    4. delay(15000L)
    5. }
    6. val channel2 = produce {
    7. delay(100L)
    8. send("a")
    9. delay(200L)
    10. send("b")
    11. delay(200L)
    12. send("c")
    13. }
    14. suspend fun selectChannel(
    15. channel1: ReceiveChannel<String>,
    16. channel2: ReceiveChannel<String>
    17. ): String = select {
    18. channel1.onReceiveCatching {
    19. it.getOrNull() ?: "channel1 is closed!"
    20. }
    21. channel2.onReceiveCatching {
    22. it.getOrNull() ?: "channel2 is closed!"
    23. }
    24. }
    25. repeat(6) {
    26. val result = selectChannel(channel1, channel2)
    27. println(result)
    28. }
    29. channel1.cancel()
    30. channel2.cancel()
    31. println("Time cost: ${System.currentTimeMillis() - startTime}")
    32. }
    33. Log
    34. a
    35. b
    36. c
    37. channel2 is closed!
    38. channel2 is closed!
    39. channel2 is closed!
    40. Time cost: 612
    41. Process finished with exit code 0

     Deferred、Channel 的 API:

    1. public interface Deferred : CoroutineContext.Element {
    2. public suspend fun join()
    3. public suspend fun await(): T
    4. public val onJoin: SelectClause0
    5. public val onAwait: SelectClause1
    6. }
    7. public interface SendChannel<in E>
    8. public suspend fun send(element: E)
    9. public val onSend: SelectClause2>
    10. }
    11. public interface ReceiveChannel<out E> {
    12. public suspend fun receive(): E
    13. public suspend fun receiveCatching(): ChannelResult
    14. public val onReceive: SelectClause1
    15. public val onReceiveCatching: SelectClause1>
    16. }

    当 select 与 Deferred 结合使用的时候,当并行的 Deferred 比较多的时候,你往往需要在得到一个最快的结果以后,去取消其他的 Deferred。

    通过 async 并发执行协程,也可以借助 select 得到最快的结果。

    1. runBlocking {
    2. suspend fun fastest(vararg deferreds: Deferred<T>): T = select {
    3. fun cancelAll() = deferreds.forEach {
    4. it.cancel()
    5. }
    6. for (deferred in deferreds) {
    7. deferred.onAwait {
    8. cancelAll()
    9. it
    10. }
    11. }
    12. }
    13. val deferred1 = async {
    14. delay(100L)
    15. println("done1")
    16. "result1"
    17. }
    18. val deferred2 = async {
    19. delay(200L)
    20. println("done2")
    21. "result2"
    22. }
    23. val deferred3 = async {
    24. delay(300L)
    25. println("done3")
    26. "result3"
    27. }
    28. val deferred4 = async {
    29. delay(400L)
    30. println("done4")
    31. "result4"
    32. }
    33. val deferred5 = async {
    34. delay(5000L)
    35. println("done5")
    36. "result5"
    37. }
    38. val fastest = fastest(deferred1, deferred2, deferred3, deferred4, deferred5)
    39. println(fastest)
    40. }
    41. Log
    42. done1
    43. result1
    44. Process finished with exit code 0

     

  • 相关阅读:
    09.06app端自动化
    热力学相关的两个定律
    鸿蒙开发实例:【配置OpenHarmony SDK】
    Git 学习笔记
    【优化方案】Java 将字符串中的星号替换为0-9中的数字,并返回所有可能的替换结果
    2022.7.29好题选讲(计数专题)
    猿创征文|DEM分析分层重分类
    virtio代码分析(一)-qemu部分
    老子云平台会员专业又有性价比!
    Java 类和对象
  • 原文地址:https://blog.csdn.net/zhangying1994/article/details/127485681