• Go编程源码分析channel


    0. 前言

    在go中经常谈到的一句话是:不要通过共享内存的方式进行通信,而是应该通过通信的方式共享内存。
    在Goroutine之间通过channel传递数据,作为Go语言的核心数据结构和Goroutine之间的通信方式,channel是支撑Go语言高性能并发编程模型的重要结构。
    在这里插入图片描述
    channel在运行时的内部表示是runtime.hchan,该结构体中包含了用于保护成员变量的互斥锁,从某种程度上说,channel是一个用于同步和通信的有锁队列。hchan结构体源码:

    
    type hchan struct {
        qcount    uint        // 循环列表元素个数
        dataqsiz  uint        // 循环队列的大小
        buf      unsafe.Pointer  // 循环队列的指针
        elemsize  uint16      // chan中元素的大小
        closed    uint32      // 是否已close
        elemtype  *_type      // chan中元素类型
        sendx    uint        // chan的发送操作处理到的位置
        recvx    uint        // chan的接收操作处理到的位置
        recvq    waitq        // 等待接收数据的Goroutine列表
        sendq    waitq        // 等待发送数据的Goroutine列表
        
        lock    mutex        // 互斥锁
    }
    
    type waitq struct {        // 双向链表
        first  *sudog
        last  *sudog
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20

    waitq中连接的是一个sudog双向链表,保存的是等待中的Goroutine。
    在这里插入图片描述

    1 创建channel

    使用make关键字来创建channel,**make(chan int, 3)**会调用到runtime.makechan函数

    
    const (
      maxAlign  = 8
      hchanSize = unsafe.Sizeof(hchan{}) + uintptr(-int(unsafe.Sizeof(hchan{}))&(maxAlign-1))
    )
    
    func makechan(t *chantype, size int) *hchan {
      elem := t.elem
        
        // 计算需要分配的buf空间大小
      mem, overflow := math.MulUintptr(elem.size, uintptr(size))
      if overflow || mem > maxAlloc-hchanSize || size < 0 {
        panic(plainError("makechan: size out of range"))
      }
    
      var c *hchan
      switch {
      case mem == 0:
        // chan的大小或者elem的大小为0,不需要创建buf
        c = (*hchan)(mallocgc(hchanSize, nil, true))
        // Race detector uses this location for synchronization.
        c.buf = c.raceaddr()
      case elem.ptrdata == 0:
        // elem不含指针,分配一块连续的内存给hchan数据结构和buf
        c = (*hchan)(mallocgc(hchanSize+mem, nil, true))
        c.buf = add(unsafe.Pointer(c), hchanSize)
      default:
        // elem包含指针,单独分配buf
        c = new(hchan)
        c.buf = mallocgc(mem, elem, true)
      }
    
        // 更新hchan的elemsize、elemtype、dataqsiz字段
      c.elemsize = uint16(elem.size)
      c.elemtype = elem
      c.dataqsiz = uint(size)
        
      return c
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40

    上述代码根据channel中收发元素的类型和缓冲区的大小初始化runtime.hchan和缓冲区:

    • 若缓冲区所需大小为0,就只会为hchan分配一段内存;
    • 若缓冲区所需大小不为0且elem不包含指针,会为hchan和buf分配一块连续的内存;
    • 若缓冲区所需大小不为0且elem包含指针,会单独为hchan和buf分配内存。

    2 发送数据到channel

    发送数据到channel,ch<-i会调用到runtime.chansend函数中,该函数包含了发送数据的全部逻辑:

    func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
        if c == nil {
        // 对于非阻塞的发送,直接返回
        if !block {
          return false
        }
        // 对于阻塞的通道,将goroutine挂起
        gopark(nil, nil, waitReasonChanSendNilChan, traceEvGoStop, 2)
        throw("unreachable")
      }
        // 加锁
      lock(&c.lock)
        // channel已关闭,panic
      if c.closed != 0 {
        unlock(&c.lock)
        panic(plainError("send on closed channel"))
      }
        ...
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19

    block表示当前的发送操作是否是阻塞调用。如果channel为空,对于非阻塞的发送,直接返回false,对于阻塞的发送,将goroutine挂起,并且永远不会返回。对channel加锁,防止多个线程并发修改数据,如果channel已关闭,报错并中止程序。

    runtime.chansend函数的执行过程可以分为以下三个部分:

    • 当存在等待的接收者时,通过runtime.send直接将数据发送给阻塞的接收者;
    • 当缓冲区存在空余空间时,将发送的数据写入缓冲区;
    • 当不存在缓冲区或缓冲区已满时,等待其他Goroutine从channel接收数据。

    2.1 直接发送

    如果目标channel没有被关闭且recvq队列中已经有处于读等待的Goroutine,那么runtime.chansend会从接收队列 recvq中取出最先陷入等待的Goroutine并直接向它发送数据,注意,由于有接收者在等待,所以如果有缓冲区,那么缓冲区一定是空的:

    
    func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
        ...
        // 从recvq中取出一个接收者
      if sg := c.recvq.dequeue(); sg != nil { 
        // 如果接收者存在,直接向该接收者发送数据,绕过buf
        send(c, sg, ep, func() { unlock(&c.lock) }, 3)
        return true
      }
        ...
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11

    直接发送会调用runtime.send函数:

    
    func send(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) {
      ...
      if sg.elem != nil {
        // 直接把要发送的数据copy到接收者的栈空间
        sendDirect(c.elemtype, sg, ep)
        sg.elem = nil
      }
      gp := sg.g
      unlockf()
      gp.param = unsafe.Pointer(sg)
      if sg.releasetime != 0 {
        sg.releasetime = cputicks()
      }
      // 设置对应的goroutine为可运行状态
      goready(gp, skip+1)
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17

    sendDirect方法调用memmove进行数据的内存拷贝。goready方法将等待接收数据的Goroutine标记成可运行状态(Grunnable)并把该Goroutine发到发送方所在的处理器的runnext上等待执行,该处理器在下一次调度时会立刻唤醒数据的接收方。注意,只是放到了runnext中,并没有立刻执行该Goroutine。

    2.2 发送到缓冲区

    如果缓冲区未满,则将数据写入缓冲区:

    
    func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
      ...
      // 如果缓冲区没有满,直接将要发送的数据复制到缓冲区
      if c.qcount < c.dataqsiz {
        // 找到buf要填充数据的索引位置
        qp := chanbuf(c, c.sendx)
        ...
        // 将数据拷贝到buf中
        typedmemmove(c.elemtype, qp, ep)
        // 数据索引前移,如果到了末尾,又从0开始
        c.sendx++
        if c.sendx == c.dataqsiz {
          c.sendx = 0
        }
        // 元素个数加1,释放锁并返回
        c.qcount++
        unlock(&c.lock)
        return true
      }
      ...
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22

    找到缓冲区要填充数据的索引位置,调用typedmemmove方法将数据拷贝到缓冲区中,然后重新设值sendx偏移量。

    2.3 阻塞发送

    当channel没有接收者能够处理数据时,向channel发送数据会被下游阻塞,使用select关键字可以向channel非阻塞地发送消息:

    
    func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
      ...
      // 缓冲区没有空间了,对于非阻塞调用直接返回
      if !block {
        unlock(&c.lock)
        return false
      }
      // 创建sudog对象
      gp := getg()
      mysg := acquireSudog()
      mysg.releasetime = 0
      if t0 != 0 {
        mysg.releasetime = -1
      }
      mysg.elem = ep
      mysg.waitlink = nil
      mysg.g = gp
      mysg.isSelect = false
      mysg.c = c
      gp.waiting = mysg
      gp.param = nil
      // 将sudog对象入队
      c.sendq.enqueue(mysg)
      // 进入等待状态
      gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanSend, traceEvGoBlockSend, 2)
      ...
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28

    对于非阻塞的调用会直接返回,对于阻塞的调用会创建sudog对象并将sudog对象加入发送等待队列。调用gopark将当前Goroutine转入waiting状态。调用gopark之后,在使用者看来向该channel发送数据的代码语句会被阻塞。

    在这里插入图片描述
    注意,发送数据的过程中包含几个会触发Goroutine调度的时机:

    • 注意,发送数据的过程中包含几个会触发Goroutine调度的时机:
    • 发送数据时并没有找到接收方并且缓冲区已经满了,这时会将自己加入channel的sendq队列并调用gopark触发Goroutine的调度让出处理器的使用权。

    3 从chan接收数据

    从channel获取数据最终调用到runtime.chanrecv函数

    
    func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
      if c == nil {
            // 如果c为空且是非阻塞调用,直接返回
        if !block {
          return
        }
            // 阻塞调用直接等待
        gopark(nil, nil, waitReasonChanReceiveNilChan, traceEvGoStop, 2)
        throw("unreachable")
      }
      ···
      lock(&c.lock)
        // 如果c已经关闭,并且c中没有数据,返回
      if c.closed != 0 && c.qcount == 0 {
        unlock(&c.lock)
        if ep != nil {
          typedmemclr(c.elemtype, ep)
        }
        return true, false
      }
        ···
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24

    当从一个空channel接收数据时,直接调用gopark让出处理器使用权。如果当前channel已被关闭且缓冲区中没有数据,直接返回。

    runtime.chanrecv函数的具体执行过程可以分为以下三个部分:

    • 当存在等待的发送者时,通过runtime.recv从阻塞的发送者或者缓冲区中获取数据;
    • 当缓冲区存在数据时,从channel的缓冲区中接收数据;
    • 当缓冲区中不存在数据时,等待其他Goroutine向channel发送数据。

    3.1 直接接收

    当channel的sendq队列中包含处于发送等待状态的Goroutine时,调用runtime.recv直接从这个发送者那里提取数据。注意,由于有发送者在等待,所以如果有缓冲区,那么缓冲区一定是满的。

    
    func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
      ...
      // 从发送者队列获取数据
      if sg := c.sendq.dequeue(); sg != nil { 
        // 发送者队列不为空,直接从发送者那里提取数据
        recv(c, sg, ep, func() { unlock(&c.lock) }, 3)
        return true, true
      } 
      ...
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11

    主要看一下runtime.recv的实现:

    
    func recv(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) {
        // 如果是无缓冲区chan
      if c.dataqsiz == 0 {
        if ep != nil {
                // 直接从发送者拷贝数据
          recvDirect(c.elemtype, sg, ep)
        }
        // 有缓冲区chan
      } else {
            // 获取buf的存放数据指针
        qp := chanbuf(c, c.recvx)
            // 直接从缓冲区拷贝数据给接收者
        if ep != nil {
          typedmemmove(c.elemtype, ep, qp)
        }
            // 从发送者拷贝数据到缓冲区
        typedmemmove(c.elemtype, qp, sg.elem)
        c.recvx++
        c.sendx = c.recvx // c.sendx = (c.sendx+1) % c.dataqsiz
      }
      gp := sg.g
      gp.param = unsafe.Pointer(sg)
        // 设置对应的goroutine为可运行状态
      goready(gp, skip+1)
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26

    该函数会根据缓冲区的大小分别处理不同的情况:

    • 如果channel不存在缓冲区:直接从发送者那里提取数据。
    • 如果channel存在缓冲区:将缓冲区中的数据拷贝到接收方的内存地址;将发送者数据拷贝到缓冲区,并唤醒发送者。
      无论发生哪种情况,运行时都会调用goready将等待发送数据的Goroutine标记成可运行状态(Grunnable)并将当前处理器的runnext设置成发送数据的Goroutine,在调度器下一次调度时将阻塞的发送方唤醒。

    3.2 从缓冲区接收

    如果channel缓冲区中有数据且发送者队列中没有等待发送的Goroutine时,直接从缓冲区中recvx的索引位置取出数据:

    func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
      ...
        // 如果缓冲区中有数据
      if c.qcount > 0 {
        qp := chanbuf(c, c.recvx)
            // 从缓冲区复制数据到ep
        if ep != nil {
          typedmemmove(c.elemtype, ep, qp)
        }
        typedmemclr(c.elemtype, qp)
            // 接收数据的指针前移
        c.recvx++
            // 环形队列,如果到了末尾,再从0开始
        if c.recvx == c.dataqsiz {
          c.recvx = 0
        }
            // 缓冲区中现存数据减一
        c.qcount--
            unlock(&c.lock)
        return true, true
      }
      ...
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23

    3.3 阻塞接收

    当channel的发送队列中不存在等待的Goroutine并且缓冲区中也不存在任何数据时,从管道中接收数据的操作会被阻塞,使用 select 关键字可以非阻塞地接收消息:

    func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
      ...
      // 非阻塞,直接返回
      if !block {
        unlock(&c.lock)
        return false, false
      } 
      // 创建sudog
      gp := getg()
      mysg := acquireSudog()
      ···
      gp.waiting = mysg
      mysg.g = gp
      mysg.isSelect = false
      mysg.c = c
      gp.param = nil
      // 将sudog添加到等待接收队列中
      c.recvq.enqueue(mysg)
      // 阻塞Goroutine,等待被唤醒
      gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanReceive, traceEvGoBlockRecv, 2)
      ...
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22

    如果是非阻塞调用,直接返回。阻塞调用会将当前Goroutine封装成sudog,然后将sudog添加到等待接收队列中,调用gopark让出处理器的使用权并等待调度器的调度。

    注意,接收数据的过程中包含几个会触发Goroutine调度的时机:

    1. 当channel为空时
    2. 当channel的缓冲区中不存在数据并且sendq中也不存在等待的发送者时

    4 关闭chan

    关闭通道会调用到runtime.closechan方法:

    
    func closechan(c *hchan) {
        // 校验逻辑
        ...
        lock(&c.lock)
        // 设置chan已关闭
      c.closed = 1
      var glist gList
        // 获取所有接收者
      for {
        sg := c.recvq.dequeue()
        if sg == nil {
          break
        }
        if sg.elem != nil {
          typedmemclr(c.elemtype, sg.elem)
          sg.elem = nil
        }
        gp := sg.g
        gp.param = nil
        glist.push(gp)
      }
      // 获取所有发送者
      for {
        sg := c.sendq.dequeue()
        ...
      }
        unlock(&c.lock)
        // 唤醒所有glist中的goroutine
      for !glist.empty() {
        gp := glist.pop()
        gp.schedlink = 0
        goready(gp, 3)
      }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35

    将recvq和sendq两个队列中的Goroutine加入到gList中,并清除所有sudog上未被处理的元素。最后将所有glist中的Goroutine加入调度队列,等待被唤醒。注意,发送者在被唤醒之后会panic。

    5 总结

    在这里插入图片描述

  • 相关阅读:
    关于C#导出Word时报错“{00020970-0000-0000-C000-000000000046}加载类型库/DLL 时出错”的解决办法
    信息学奥赛一本通2060:【例1.1】计算机输出
    矩阵可交换的定义和性质
    8.ProForm 遇到表单的label ,文案可能过长
    ansible User 模块
    字节跳动面试真题-最长回文子串
    一文读懂如何部署具有外部数据库的高可用 K3s
    LeetCode-667. 优美的排列 II【数组,数学】
    leetcode70爬楼梯
    双十二怎么入手物品,2022年双十二值得入手的好物分享
  • 原文地址:https://blog.csdn.net/apple_56973763/article/details/127132359