对于 channel 而言,更多面向的是一对一的情况,因为一旦 receive() 后,该数据就被获取了,后续再 receive() ,也无法拿到之前的数据。但是,类似的,Flow 与 sharedflow 是相对应的,与 channel 相对应的是 BroadcastChannel。
GlobalScope.launch {
List(2) { index ->
launch {
val receiver = channel.openSubscription()
val iterator = receiver.iterator()
while (iterator.hasNext()) {
println("channel $index 消费了 ${iterator.next()}")
}
}
}
delay(1000)
launch {
(1..3).forEach { value ->
println("channel 生产了 $value")
channel.send(value)
}
}
}
日志输出:
I/System.out: channel 生产了 1
I/System.out: channel 1 消费了 1
I/System.out: channel 0 消费了 1
I/System.out: channel 生产了 2
I/System.out: channel 1 消费了 2
I/System.out: channel 0 消费了 2
I/System.out: channel 生产了 3
I/System.out: channel 1 消费了 3
I/System.out: channel 0 消费了 3
BroadcastChannel 并不是直接通过 receive() 进行获取数据,而且先执行 openSubscription() 获得 ReceiveChannel,然后在通过 ReceiveChannel 去进行 receive() ,在这里我为了方便遍历就使用了 iterator() 。
有一点要额外注意,就是 sharedflow 和 stateflow 都是先 emit() 数据,再进行 collect() 的,而 BroadcastChannel 是刚好相反,是先注册,然后再获取数据的,所以,当 BroadcastChannel 已经 send() 后,再执行 openSubscription() 是无法获取数据的。下面我们通过这个小栗子验证下:
val channel = BroadcastChannel(3)
GlobalScope.launch {
launch {
val receiver = channel.openSubscription()
val iterator = receiver.iterator()
while (iterator.hasNext()) {
println("channel 1 消费了 ${iterator.next()}")
}
}
delay(1000)
launch {
(1..3).forEach { value ->
println("channel 生产了 $value")
channel.send(value)
}
}
delay(1000)
launch {
val receiver = channel.openSubscription()
val iterator = receiver.iterator()
while (iterator.hasNext()) {
println("channel 2 消费了 ${iterator.next()}")
}
}
}
日志输出:
I/System.out: channel 生产了 1
I/System.out: channel 生产了 2
I/System.out: channel 1 消费了 1
I/System.out: channel 生产了 3
I/System.out: channel 1 消费了 2
I/System.out: channel 1 消费了 3
通过日志我们会发现 channel 2 是一个日志输出都没有,所以这就验证了我们的猜想。
对于 Channel 而言,只要 cancel() 或 close() 之后,再进行 send() 或者 receive(),就会抛出异常,不过,对于 BroadcastChannel,其中一个 ReceiveChannel 进行 cancel() 操作,并不会影响其它 ReceiveChannel:
val channel = BroadcastChannel(3)
GlobalScope.launch {
launch {
val receiver = channel.openSubscription()
println("channel 1 消费了 ${receiver.receive()}")
receiver.cancel()
}
launch {
val receiver2 = channel.openSubscription()
val iterator2 = receiver2.iterator()
while (iterator2.hasNext()) {
println("channel 2 消费了 ${iterator2.next()}")
delay(500)
}
}
delay(1000)
launch {
(1..3).forEach { value ->
println("channel 生产了 $value")
channel.send(value)
}
}
}
日志输出:
I/System.out: channel 生产了 1
I/System.out: channel 1 消费了 1
I/System.out: channel 生产了 2
I/System.out: channel 2 消费了 1
I/System.out: channel 生产了 3
I/System.out: channel 2 消费了 2
I/System.out: channel 2 消费了 3
public fun BroadcastChannel(capacity: Int): BroadcastChannel
BroadcastChannel 的构造方法只有一个参数 capacity,它是容量配置,当其赋值大于等于 1 的时候,就是设置相应的容量,当其小于 1 的时候,进行额外处理,并有相应的常数:
IllegalArgumentException 异常,内容为:“Unsupported 0 capacity for BroadcastChannel”。kotlinx.coroutines.channels.defaultBuffer 的默认值,假如有的话,就将该值设置为容量值,假如没有的话,就设置默认值为 64 容量。IllegalArgumentException 异常,内容为:“Unsupported UNLIMITED capacity for BroadcastChannel”。针对于 Channel,有一种比较简便的方式转为 BroadcastChannel:
val channel = Channel().broadcast(3)