• BroadcastChannel全解析


    前言

    对于 channel 而言,更多面向的是一对一的情况,因为一旦 receive() 后,该数据就被获取了,后续再 receive() ,也无法拿到之前的数据。但是,类似的,Flow 与 sharedflow 是相对应的,与 channel 相对应的是 BroadcastChannel。

    使用

            GlobalScope.launch {
                List(2) { index ->
                    launch {
                        val receiver = channel.openSubscription()
                        val iterator = receiver.iterator()
                        while (iterator.hasNext()) {
                            println("channel $index 消费了 ${iterator.next()}")
                        }
                    }
                }
                delay(1000)
                
                launch {
                    (1..3).forEach { value ->
                        println("channel 生产了 $value")
                        channel.send(value)
                    }
                }
            }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19

    日志输出:

    I/System.out: channel 生产了 1
    I/System.out: channel 1 消费了 1
    I/System.out: channel 0 消费了 1
    I/System.out: channel 生产了 2
    I/System.out: channel 1 消费了 2
    I/System.out: channel 0 消费了 2
    I/System.out: channel 生产了 3
    I/System.out: channel 1 消费了 3
    I/System.out: channel 0 消费了 3
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9

    BroadcastChannel 并不是直接通过 receive() 进行获取数据,而且先执行 openSubscription() 获得 ReceiveChannel,然后在通过 ReceiveChannel 去进行 receive() ,在这里我为了方便遍历就使用了 iterator()

    有一点要额外注意,就是 sharedflow 和 stateflow 都是先 emit() 数据,再进行 collect() 的,而 BroadcastChannel 是刚好相反,是先注册,然后再获取数据的,所以,当 BroadcastChannel 已经 send() 后,再执行 openSubscription() 是无法获取数据的。下面我们通过这个小栗子验证下:

    
            val channel = BroadcastChannel(3)
    
            GlobalScope.launch {
                launch {
                    val receiver = channel.openSubscription()
                    val iterator = receiver.iterator()
                    while (iterator.hasNext()) {
                        println("channel 1 消费了 ${iterator.next()}")
                    }
                }
                delay(1000)
    
                launch {
                    (1..3).forEach { value ->
                        println("channel 生产了 $value")
                        channel.send(value)
                    }
                }
    
                delay(1000)
    
                launch {
                    val receiver = channel.openSubscription()
                    val iterator = receiver.iterator()
                    while (iterator.hasNext()) {
                        println("channel 2 消费了 ${iterator.next()}")
                    }
                }
            }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30

    日志输出:

    I/System.out: channel 生产了 1
    I/System.out: channel 生产了 2
    I/System.out: channel 1 消费了 1
    I/System.out: channel 生产了 3
    I/System.out: channel 1 消费了 2
    I/System.out: channel 1 消费了 3
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6

    通过日志我们会发现 channel 2 是一个日志输出都没有,所以这就验证了我们的猜想。

    cancel()

    对于 Channel 而言,只要 cancel()close() 之后,再进行 send() 或者 receive(),就会抛出异常,不过,对于 BroadcastChannel,其中一个 ReceiveChannel 进行 cancel() 操作,并不会影响其它 ReceiveChannel:

            val channel = BroadcastChannel(3)
    
            GlobalScope.launch {
                launch {
                    val receiver = channel.openSubscription()
                    println("channel 1 消费了 ${receiver.receive()}")
                    receiver.cancel()
                }
                launch {
                    val receiver2 = channel.openSubscription()
                    val iterator2 = receiver2.iterator()
                    while (iterator2.hasNext()) {
                        println("channel 2 消费了 ${iterator2.next()}")
                        delay(500)
                    }
                }
                delay(1000)
    
                launch {
                    (1..3).forEach { value ->
                        println("channel 生产了 $value")
                        channel.send(value)
                    }
                }
            }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25

    日志输出:

    I/System.out: channel 生产了 1
    I/System.out: channel 1 消费了 1
    I/System.out: channel 生产了 2
    I/System.out: channel 2 消费了 1
    I/System.out: channel 生产了 3
    I/System.out: channel 2 消费了 2
    I/System.out: channel 2 消费了 3
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7

    参数说明

    public fun  BroadcastChannel(capacity: Int): BroadcastChannel
    
    • 1

    BroadcastChannel 的构造方法只有一个参数 capacity,它是容量配置,当其赋值大于等于 1 的时候,就是设置相应的容量,当其小于 1 的时候,进行额外处理,并有相应的常数:

    • RENDEZVOUS:值为 0,抛出 IllegalArgumentException 异常,内容为:“Unsupported 0 capacity for BroadcastChannel”。
    • CONFLATED:值为 -1,创建一个 合并 channel。
    • BUFFERED:值为 -2,判断系统有没有设置该 kotlinx.coroutines.channels.defaultBuffer 的默认值,假如有的话,就将该值设置为容量值,假如没有的话,就设置默认值为 64 容量。
    • UNLIMITED:值为 Int.MAX_VALUE,抛出 IllegalArgumentException 异常,内容为:“Unsupported UNLIMITED capacity for BroadcastChannel”。

    broadcast()

    针对于 Channel,有一种比较简便的方式转为 BroadcastChannel:

    val channel = Channel().broadcast(3)
    
    • 1
  • 相关阅读:
    css 块级元素与内联元素
    【洛谷 P1037】[NOIP2002 普及组] 产生数 题解(图论+深度优先搜索+排列组合+高精度)
    Codeforces Round #835 (Div. 4) D. Challenging Valleys
    操作系统---死锁
    【Redis】五大基本数据类型操作大全
    Flink集群配置
    PG维护笔记
    【小程序源码】简洁UI自带稳定接口去印
    buu web部分wp
    Jprofiler V14中文使用文档
  • 原文地址:https://blog.csdn.net/m0_46278918/article/details/125087447