Don’t communicate by sharing memory, share memory by communicating。相信学过 Go 的同学都知道这句名言, 可以说 channel 就是后边这句话的具体实现。channel 是一个类型安全的循环队列, 能够控制 groutine 在它上面读写消息的行为, 比如阻塞某个 groutine , 或者唤醒某个 groutine。
一个通道相当于一个先进先出(FIFO)的队列, 各个元素值都是严格地按照发送的顺序排列的, 先被发送通道的元素值一定会先被接收, 一个左尖括号紧接着一个减号形象地代表了元素值的传输方向。下面是创建几种不同的通道:
ch1 := make(chan int) // 无缓冲通道 ch2 := make(chan int, 3) // 有缓冲通道 ch3 := make(chan<- int, 1) // 单向通道: 只能发送不能接收 ch4 := make(<-chan int, 1) // 单向通道: 只能接收不能发送
下面举一个简单的示例:
func main() { done := make(chan struct{}) c := make(chan string) go func() { s := <-c // 接收消息 println(s) close(done) // 关闭通道, 作为结束通知 }() c <- "lvmenglou" // 发送消息 <-done // 阻塞, 知道有数据或者通道关闭}//最后输出: lvmenglou
通道发送和接收操作基本特性:
channel 的数据结构如下:
type hchan struct { qcount uint // 当前队列中剩余元素个数 dataqsiz uint // 环形队列长度, 即可以存放的元素个数 buf unsafe.Pointer // 环形队列指针 elemsize uint16 // 每个元素的大小 closed uint32 // 标识关闭状态 elemtype *_type // 元素类型 sendx uint // 队列下标, 指示元素写入时存放到队列中的位置 recvx uint // 队列下标, 指示元素从队列的该位置读出 recvq waitq // 等待读消息的 goroutine 队列 sendq waitq // 等待写消息的 goroutine 队列 lock mutex // 互斥锁, chan 不允许并发读写 }
chan 内部实现了一个环形队列作为其缓冲区, 队列的长度是创建 chan 时指定的。
向一个 channel 中写数据简单过程如下:
从一个 channel 读数据简单过程如下:
关闭 channel 时会把 recvq 中的 G 全部唤醒, 本该写入 G 的数据位置为 nil。把 sendq 中的 G 全部唤醒, 但这些 G 会 panic。
阻塞情况:
nil 通道发送数据会被阻塞。ch := make(chan int, 2)ch = nilch <- 4 // all goroutines are asleep - deadlock!
重要知识点:
ch := make(chan int, 2)
ch <- 4
close(ch)
ch <- 3 // panic: send on closed channel
阻塞情况:
重要知识点:
c := make(chan int, 3)c <- 11c <- 12close(c)for i := 0; i < cap(c)+1; i++ { x, ok := <-c println(i, ":", ok, x)}// 输出// 0: true 11// 1: true 12// 2: false 0// 3: false 0
重要知识点:
ch := make(chan int, 2)ch <- 4close(ch)close(ch) // panic: close of closed channel
我们常常会用 for-range 来读取 channel 的数据
ch := make(chan int, 1)go func(ch chan int) { for i := 0; i < 10; i++ { ch <- i } close(ch)}(ch)for val := range ch { fmt.Println(val)}
重要知识点:
select 是跟 channel 关系最亲密的语句, 它是被专门设计出来处理通道的, 因为每个 case 后面跟的都是通道表达式, 可以是读, 也可以是写。下面看一个简单的示例:
// 准备好几个通道。intChannels := [3]chan int{ make(chan int, 1), make(chan int, 1), make(chan int, 1),}// 随机选择一个通道, 并向它发送元素值。index := rand.Intn(3)fmt.Printf("The index: %d\n", index)intChannels[index] <- index// 哪一个通道中有可取的元素值, 哪个对应的分支就会被执行。select {case <-intChannels[0]: fmt.Println("The first candidate case is selected.")case <-intChannels[1]: fmt.Println("The second candidate case is selected.")case elem := <-intChannels[2]: fmt.Printf("The third candidate case is selected, the element is %d.\n", elem)default: fmt.Println("No candidate case is selected!")}
我们用一个包含了三个候选分支的 select 语句, 分别尝试从上述三个通道中接收元素值, 哪一个通道中有值, 哪一个对应的候选分支就会被执行。后面还有一个默认分支, 不过在这里它是不可能被选中的。在使用 select 语句的时候, 我们需要注意下面几个事情:
intChan := make(chan int, 1)// 一秒后关闭通道。time.AfterFunc(time.Second, func() { close(intChan)})select {case _, ok := <-intChan: if !ok { fmt.Println("The candidate case is closed.") break } fmt.Println("The candidate case is selected.")}
上面的知识需要牢记, 面试常考, 下面是讲解 select 执行的流程:
上面写的有些多, 简单总结一下: 执行 select 时, 会从左到右, 从上到下, 对每个 case 表达式求值, 当所有 case 求值完毕后, 会挑选满足的 case 执行, 如果有多条都满足, 就随机选择一条; 如果都没有满足, 就执行 default; 如果连 default 都没有, 就阻塞住, 等有满足条件的 case 出现时, 再执行。
关于 channel, 零碎的知识点非常多, 我还是想通过一个完整的示例, 将这些知识点全部串起来, 下面就以海外商城 Push 为例, 将上面知识应用到实际场景中。
海外商城需要对 W 个业务方发送 Push, 针对每个业务方, 为了提高 Push 的并发能力, 采用 N 个协程从 EMQ 中读取数据(EMQ 中都一个消息队列, 里面缓存了大量的 Push 数据), 数据读取后进行处理, 然后将处理后的数据写到 channel 中。同时, 服务有 M 个协程从 channel 中取出数据并消费, 然后通过小米 Push SDK, 给用户发送 Push。整体发送链路如下:
在看后面的内容前, 我先抛出几个问题:
初始化 channel 数组, 数组里面是每个业务方 appTypes 的 channel, channel 的缓存区大小为 30, 并启动 10 个消费者协程:
var ( messageChan map[string]chan *WorkMessage // channel stopMasterChan chan bool // 消费者结束通知 appTypes = map[int32]string{1: "shop", 2: "bbs", 3: "sharesave"})func initPushChannel() { maxSize = 30 // channel 缓存区大小 workNum = 10 // goroutine 个数 stopMasterChan = make(chan bool) messageChan = make(map[string]chan *WorkMessage) for _, name := range appTypes { workChan := make(chan *WorkMessage, maxSize) messageChan[name] = workChan for i := 0; i < workNum; i++ { go startMaster(name, workChan) // 启动消费者协程 } }}func startMaster(name string, workChan chan *WorkMessage) { for { if exit := dostartMaster(name, workChan); exit { return } }}
初始化 EMQ 的 Client, 并启动 10 个生产者协程:
var ( clientFactory client.ClientFactory // EMQ Client stopChan chan bool // 生产者结束通知)func initEmq() { // 初始化 EMQ 的 Client 和单次读取数据条数, 该处代码省略。.. maxConsumerNum := 10 stopChan = make(chan bool) for i := 0; i < maxConsumerNum; i++ { go receiveMsg(i) // 启动生产者协程 }}func receiveMsg(queueID int) { for { if exit := doReceiveMsg(queueID); exit { logz.Info("stop receive msg ...", logz.F("queueID", queueID)) return } }}
主方法调用:
func InitWorker() { // 初始化 push SDK, 逻辑省略。.. initPushChannel() // 初始化 Channel, 启动消费者 initEmq() // 启动生产者}
func doReceiveMsg(queueID int) bool { defer func() { if err := recover(); err != nil { println("[panic] recover from error.") } }() ticker := time.NewTicker(time.Second) for { select { case <-ticker.C: // 1. 从 EMQ 获取数据 List, 逻辑省略。.. // 2. 遍历 List, 获取业务类型, 逻辑省略。.. // 3. 根据业务类型, 获取对应的 channel name := "sharesave" // 示例数据 pushChannel, _ := messageChan[name] // 4. 构造 Push 数据, 然后放入 channel pushData := &WorkMessage{AppLocal: "id", AppType: 1} // 示例数据 pushChannel <- pushData case <-stopChan: println("stop to send data to channel.") return true } }}
这部分代码我做了大量简化, 这里主要做了 2 件事情:
func dostartMaster(name string, workChan chan *WorkMessage) bool { defer func() { if err := recover(); err != nil { println("[panic] recover from error.") } }() for { select { case t := <-workChan: if t != nil { for _, message := range t.PushMessages { // 接受 channel 数据 t, 将数据推给 Push SDK // 逻辑省略。.. } } case <-stopMasterChan: println("stop to get data from channel.") return true } }}
这部分代码同样做了大量简化, 这里主要做了 2 件事情:
// 通知生产者协程关闭, 协程不再写 channelfunc stopRecvMsgFromQueue() { close(stopChan)}// 通知消费者协程关闭, 协程不再读 channel, 并关闭 channel, 消费完 channel 中剩余消息 func stopPushChannel() { close(stopMasterChan) time.Sleep(time.Second) for _, c := range messageChan { close(c) for msg := range c { if msg != nil { for _, message := range msg.PushMessages { // 接受 channel 数据 t, 将数据推给 Push SDK // 逻辑省略。.. } } } }}// 主方法调用 func StopWorker() { stopRecvMsgFromQueue() time.Sleep(time.Second * 2) stopPushChannel()}
比如服务重启, 需要关闭协程时, 主要做以下事情:
这里有两个地方 sleep 了一下, 分别有以下作用:
本章基本都是干货, 上面总结的比较全面, 这里就不再重复了, 如果你能回答我提的这些问题, 你应该就掌握了本章的内容:
最后就是 Push 的并发示例, 强烈建议大家能掌握, 掌握了这个示例, 后续你应该也能很容易通过 channel 实现数据共享, 并结合 goroutine 写出你自己的高并发程序。