• golang中的信号量的实现原理


    概述

    我们前面讲过 操作系统的信号量,以及 golang中的Mutex原理解析,就抛出了一个问题,操作系统的信号量的管理对象是线程,而 Mutex 中使用的信号量是针对协程的,那么这就意味着golang需要重新实现一套基于协程的信号量,随着对golang源码的研究,我发现golang的 runtime 就像一个微型的操作系统,功能非常强大。

    go version go1.18.3 windows/amd64

    // src/runtime/sema.go
    
    // Semaphore implementation exposed to Go.
    // Intended use is provide a sleep and wakeup
    // primitive that can be used in the contended case
    // of other synchronization primitives.
    // Thus it targets the same goal as Linux's futex,
    // but it has much simpler semantics.
    //
    // That is, don't think of these as semaphores.
    // Think of them as a way to implement sleep and wakeup
    // such that every sleep is paired with a single wakeup,
    // even if, due to races, the wakeup happens before the sleep.
    //
    // See Mullender and Cox, ``Semaphores in Plan 9,''
    // https://swtch.com/semaphore.pdf
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16

    具体的用法是提供 sleep 和 wakeup 原语
    以使其能够在其它同步原语中的竞争情况下使用
    因此这里的 semaphore 和 Linux 中的 futex 目标是一致的
    只不过语义上更简单一些

    也就是说,不要认为这些是信号量
    把这里的东西看作 sleep 和 wakeup 实现的一种方式
    每一个 sleep 都会和一个 wakeup 配对
    即使在发生 race 时,wakeup 在 sleep 之前时也是如此

    上面提到了和futex作用一样,关于futex

    futex(快速用户区互斥的简称)是一个在Linux上实现锁定和构建高级抽象锁如信号量和POSIX互斥的基本工具。

    Futex 由一块能够被多个进程共享的内存空间(一个对齐后的整型变量)组成;这个整型变量的值能够通过汇编语言调用CPU提供的原子操作指令来增加或减少,并且一个进程可以等待直到那个值变成正数。Futex 的操作几乎全部在用户空间完成;只有当操作结果不一致从而需要仲裁时,才需要进入操作系统内核空间执行。这种机制允许使用 futex 的锁定原语有非常高的执行效率:由于绝大多数的操作并不需要在多个进程之间进行仲裁,所以绝大多数操作都可以在应用程序空间执行,而不需要使用(相对高代价的)内核系统调用。

    go中的semaphore作用和futex目标一样,提供sleepwakeup原语,使其能够在其它同步原语中的竞争情况下使用。当一个goroutine需要休眠时,将其进行集中存放,当需要wakeup时,再将其取出,重新放入调度器中。

    主要源码

    // src/sync/runtime.go
    
    // SemacquireMutex is like Semacquire, but for profiling contended Mutexes.
    // If lifo is true, queue waiter at the head of wait queue.
    // skipframes is the number of frames to omit during tracing, counting from
    // runtime_SemacquireMutex's caller.
    func runtime_SemacquireMutex(s *uint32, lifo bool, skipframes int)
    
    // ----------------------------------------------------------------
    
    // src/runtime/sema.go
    
    type semaRoot struct {
    	lock  mutex
    	treap *sudog // root of balanced tree of unique waiters.
    	nwait uint32 // Number of waiters. Read w/o the lock.
    }
    
    // Prime to not correlate with any user patterns.
    const semTabSize = 251
    
    var semtable [semTabSize]struct {
    	root semaRoot
    	pad  [cpu.CacheLinePadSize - unsafe.Sizeof(semaRoot{})]byte
    }
    
    //go:linkname sync_runtime_SemacquireMutex sync.runtime_SemacquireMutex
    func sync_runtime_SemacquireMutex(addr *uint32, lifo bool, skipframes int) {
    	semacquire1(addr, lifo, semaBlockProfile|semaMutexProfile, skipframes)
    }
    
    func semacquire1(addr *uint32, lifo bool, profile semaProfileFlags, skipframes int) {
    	// 获取当前协程
        gp := getg()
    	if gp != gp.m.curg {
    		throw("semacquire not on the G stack")
    	}
    
    	// Easy case.
    	if cansemacquire(addr) {
    		return
    	}
    
    	// Harder case:
    	//	increment waiter count
    	//	try cansemacquire one more time, return if succeeded
    	//	enqueue itself as a waiter
    	//	sleep
    	//	(waiter descriptor is dequeued by signaler)
    	s := acquireSudog()
    	root := semroot(addr)
    	t0 := int64(0)
    	s.releasetime = 0
    	s.acquiretime = 0
    	s.ticket = 0
    	if profile&semaBlockProfile != 0 && blockprofilerate > 0 {
    		t0 = cputicks()
    		s.releasetime = -1
    	}
    	if profile&semaMutexProfile != 0 && mutexprofilerate > 0 {
    		if t0 == 0 {
    			t0 = cputicks()
    		}
    		s.acquiretime = t0
    	}
    	for {
    		lockWithRank(&root.lock, lockRankRoot)
    		// Add ourselves to nwait to disable "easy case" in semrelease.
    		atomic.Xadd(&root.nwait, 1)
    		// Check cansemacquire to avoid missed wakeup.
    		if cansemacquire(addr) {
    			atomic.Xadd(&root.nwait, -1)
    			unlock(&root.lock)
    			break
    		}
    		// Any semrelease after the cansemacquire knows we're waiting
    		// (we set nwait above), so go to sleep.
    		root.queue(addr, s, lifo)
    		goparkunlock(&root.lock, waitReasonSemacquire, traceEvGoBlockSync, 4+skipframes)
    		if s.ticket != 0 || cansemacquire(addr) {
    			break
    		}
    	}
    	if s.releasetime > 0 {
    		blockevent(s.releasetime-t0, 3+skipframes)
    	}
    	releaseSudog(s)
    }
    
    func cansemacquire(addr *uint32) bool {
    	for {
    		v := atomic.Load(addr)
    		if v == 0 {
    			return false
    		}
    		if atomic.Cas(addr, v, v-1) {
    			return true
    		}
    	}
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89
    • 90
    • 91
    • 92
    • 93
    • 94
    • 95
    • 96
    • 97
    • 98
    • 99
    • 100

    cansemacquire(),此函数通过原子操作来修改和判断信号量的值。此处加载的包是runtime/internal/atomic,对应的函数。

    //go:noescape
    func Cas(ptr *uint32, old, new uint32) bool
    
    // src/runtime/internal/atomic/atomic_amd64.s
    
    // bool Cas(int32 *val, int32 old, int32 new)
    // Atomically:
    //	if(*val == old){
    //		*val = new;
    //		return 1;
    //	} else
    //		return 0;
    TEXT ·Cas(SB),NOSPLIT,$0-17
    	MOVQ	ptr+0(FP), BX
    	MOVL	old+8(FP), AX
    	MOVL	new+12(FP), CX
    	LOCK
    	CMPXCHGL	CX, 0(BX)
    	SETEQ	ret+16(FP)
    	RET
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20

    addr 为 uint32 类型,那么atomic.Cas(addr, v, v-1)最低只能将其值修改到0,如果v-1 < 0,那么就会返回 false,并放弃修改,这就实现了比较和修改的原子化。

    golang中的信号量没有做初始化,默认值是0,那么在阅读函数cansemacquire()的时候肯定会有疑惑。实际上,在充分理解了 Mutex 和 RWMutex 源码之后才会知道,golang中不对 sema 做初始化,它们的使用规范是先释放信号量,再获取信号量,如果还不理解可以看看golang中的Mutex原理解析

    这里的Easy case 和 Harder case就是Fast path 和 slow path,golang源码中对于循环代码块都喜欢这个干。

    skipframe 参数是用作trace跟踪性能分析用的,包括releasetimeacquiretime

    数据结构

    看到这里要先停下来搞清楚semtable, semaRoot, sudug的关系。

    addr 为一个信号量的地址,在一个程序中可能存在多个信号量,那么这些 addr 会被放入 semtable 数组中,采用取模的方式,semtable 长度为251,在声明的时候就做了初始化,每一个元素中包含一个 semaRoot,而 semaRoot 中包含一个平衡二叉数结构,用来存储着竞争信号量的协程 sudug。

    func semroot(addr *uint32) *semaRoot {
    	return &semtable[(uintptr(unsafe.Pointer(addr))>>3)%semTabSize].root
    }
    
    // sudog represents a g in a wait list, such as for sending/receiving
    // on a channel.
    //
    // sudog is necessary because the g ↔ synchronization object relation
    // is many-to-many. A g can be on many wait lists, so there may be
    // many sudogs for one g; and many gs may be waiting on the same
    // synchronization object, so there may be many sudogs for one object.
    //
    // sudogs are allocated from a special pool. Use acquireSudog and
    // releaseSudog to allocate and free them.
    type sudog struct {
    	// The following fields are protected by the hchan.lock of the
    	// channel this sudog is blocking on. shrinkstack depends on
    	// this for sudogs involved in channel ops.
    
    	g *g
    
    	next *sudog
    	prev *sudog
    	elem unsafe.Pointer // data element (may point to stack)
    
    	// The following fields are never accessed concurrently.
    	// For channels, waitlink is only accessed by g.
    	// For semaphores, all fields (including the ones above)
    	// are only accessed when holding a semaRoot lock.
    
    	acquiretime int64
    	releasetime int64
    	ticket      uint32
    
    	// isSelect indicates g is participating in a select, so
    	// g.selectDone must be CAS'd to win the wake-up race.
    	isSelect bool
    
    	// success indicates whether communication over channel c
    	// succeeded. It is true if the goroutine was awoken because a
    	// value was delivered over channel c, and false if awoken
    	// because c was closed.
    	success bool
    
    	parent   *sudog // semaRoot binary tree
    	waitlink *sudog // g.waiting list or semaRoot
    	waittail *sudog // semaRoot
    	c        *hchan // channel
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49

    在这里插入图片描述

    取模的过程肯定会存在冲突,类似于哈希冲突,因此不同的 addr 可能会被定位到同一个 semaRoot,那么在操作 semaRoot 的时候依然还需要带上 addr 参数,并将 addr 参数填充到 sudug 的 elem 字段,比如root.queue(addr) 和 root.dequeue(addr)操作。

    lifo为后进先出模式,fifo为先进先出。

    sudug 的结构比较丰富,即可以通过它来构造一个平衡二叉树(parent, prev, next),又可以构造一个单向链表(waitlink, waittail),并且可以同时存在。二叉树的查找是为了满足多个 addr 通过取模后落到了同一个位置,提高查询效率,二叉树的每一个节点都意味着不同的 addr,所以相同的 addr 进来之后发现在二叉树上存在这个 addr 的节点,那么就会作为单向链表节点挂在这个节点下面。

    在这里插入图片描述

    sudug 的waittail都指向链表的最后一个元素。

    关于sleep和wakeup协程

    与线程的挂起和唤醒原理类似,在前面成功的将协程加入到 semaRoot 之后,只需要将协程的状态设置为 Gwaiting 就可以实现挂起,而唤醒的过程是将其移出 semaRoot ,修改状态,加入到就绪队列。

    // src/runtime/sema.go
    func readyWithTime(s *sudog, traceskip int) {
    	if s.releasetime != 0 {
    		s.releasetime = cputicks()
    	}
    	goready(s.g, traceskip)
    }
    
    // src/runtime/proc.go
    
    // Puts the current goroutine into a waiting state and unlocks the lock.
    // The goroutine can be made runnable again by calling goready(gp).
    func goparkunlock(lock *mutex, reason waitReason, traceEv byte, traceskip int) {
    	gopark(parkunlock_c, unsafe.Pointer(lock), reason, traceEv, traceskip)
    }
    
    func goready(gp *g, traceskip int) {
    	systemstack(func() {
    		ready(gp, traceskip, true)
    	})
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    // Gosched yields the processor, allowing other goroutines to run. It does not
    // suspend the current goroutine, so execution resumes automatically.
    func Gosched() {
    	checkTimeouts()
    	mcall(gosched_m)
    }
    
    // goyield is like Gosched, but it:
    // - emits a GoPreempt trace event instead of a GoSched trace event
    // - puts the current G on the runq of the current P instead of the globrunq
    func goyield() {
    	checkTimeouts()
    	mcall(goyield_m)
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14

    readyWithTime() 把 sudog 对应的 g 唤醒,并且放到P本地队列的下一个执行位置。

    goyield()是调度控制,让出执行权,并放到P本地队列的的队尾,并不会挂起。

    runtime.Gosched()是调度控制,让出执行权,并放到全局队列的的队尾,并不会挂起。

    关于lock

    在对二叉树做操作的时候肯定是要加锁的,显然这个锁是要加在 semaRoot 上的,而采用 semtable 分散化在一定程度上可以降低锁的粒度。

    golang通过 sema 来实现 sync.Mutex,然后在实现 sema 的时候又用了 mutex,那么这里的 mutex 是什么呢?

    相关函数

    // Mutual exclusion locks.  In the uncontended case,
    // as fast as spin locks (just a few user-level instructions),
    // but on the contention path they sleep in the kernel.
    // A zeroed Mutex is unlocked (no need to initialize each lock).
    // Initialization is helpful for static lock ranking, but not required.
    type mutex struct {
    	// Empty struct if lock ranking is disabled, otherwise includes the lock rank
    	lockRankStruct
    	// Futex-based impl treats it as uint32 key,
    	// while sema-based impl as M* waitm.
    	// Used to be a union, but unions break precise GC.
    	key uintptr
    }
    
    goparkunlock(&root.lock, waitReasonSemacquire, traceEvGoBlockSync, 4+skipframes)
    lockWithRank(&root.lock, lockRankRoot)
    unlock(&root.lock)
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    // src/runtime/lock_sema.go
    
    func lock2(l *mutex) {
    	gp := getg()
    	if gp.m.locks < 0 {
    		throw("runtime·lock: lock count")
    	}
    	gp.m.locks++
    
    	// Speculative grab for lock.
    	if atomic.Casuintptr(&l.key, 0, locked) {
    		return
    	}
    	semacreate(gp.m)
    
    	// On uniprocessor's, no point spinning.
    	// On multiprocessors, spin for ACTIVE_SPIN attempts.
    	spin := 0
    	if ncpu > 1 {
    		spin = active_spin
    	}
    Loop:
    	for i := 0; ; i++ {
    		v := atomic.Loaduintptr(&l.key)
    		if v&locked == 0 {
    			// Unlocked. Try to lock.
    			if atomic.Casuintptr(&l.key, v, v|locked) {
    				return
    			}
    			i = 0
    		}
    		if i < spin {
    			procyield(active_spin_cnt)
    		} else if i < spin+passive_spin {
    			osyield()
    		} else {
    			// Someone else has it.
    			// l->waitm points to a linked list of M's waiting
    			// for this lock, chained through m->nextwaitm.
    			// Queue this M.
    			for {
    				gp.m.nextwaitm = muintptr(v &^ locked)
    				if atomic.Casuintptr(&l.key, v, uintptr(unsafe.Pointer(gp.m))|locked) {
    					break
    				}
    				v = atomic.Loaduintptr(&l.key)
    				if v&locked == 0 {
    					continue Loop
    				}
    			}
    			if v&locked != 0 {
    				// Queued. Wait.
    				semasleep(-1)
    				i = 0
    			}
    		}
    	}
    }
    
    //go:nowritebarrier
    // We might not be holding a p in this code.
    func unlock2(l *mutex) {
    	gp := getg()
    	var mp *m
    	for {
    		v := atomic.Loaduintptr(&l.key)
    		if v == locked {
    			if atomic.Casuintptr(&l.key, locked, 0) {
    				break
    			}
    		} else {
    			// Other M's are waiting for the lock.
    			// Dequeue an M.
    			mp = muintptr(v &^ locked).ptr()
    			if atomic.Casuintptr(&l.key, v, uintptr(mp.nextwaitm)) {
    				// Dequeued an M.  Wake it.
    				semawakeup(mp)
    				break
    			}
    		}
    	}
    	gp.m.locks--
    	if gp.m.locks < 0 {
    		throw("runtime·unlock: lock count")
    	}
    	if gp.m.locks == 0 && gp.preempt { // restore the preemption request in case we've cleared it in newstack
    		gp.stackguard0 = stackPreempt
    	}
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89

    golang中能同时并行执行的G的个数其实就是逻辑CPU的个数,也就是GMP模型中的M个数,此时每一个M上正在运行一个G,而这些G同时都在抢 mutex 来操作二叉树,通过源码可以大致判断出,此处是直接对M加的锁,通过atomic.Casuintptr(&l.key, 0, 1)来限制只能是第一个G能操作成功,从而能获得锁,其他的G则要继续往下走,先是自旋一定次数获取锁,还是不行的话就调用操作系统的信号量来对线程M进行阻塞,自然G也就没法执行了,要知道,这个锁只发生在对二叉树的操作前后,时间很短,当然如果要抢锁的G过多肯定会造成M被锁的时间变长。

  • 相关阅读:
    HTML+CSS滚动条样式如何单独给firefox设置 scrollbar-width: none;,而不影响其他浏览器
    P1803 凌乱的yyy / 线段覆盖 【贪心】
    Jetpack Compose 入门教程之Text
    《推进农业水价综合改革的意见》解读
    PHP写一个 电商Api接口需要注意哪些?考虑哪些?
    python公司员工考勤工资管理系统django662
    基于SSM实现企业生资源管理系统-ERP系统
    第4章SpringBoot ⽇志
    Face Global | 创龙科技2款新品登陆TI全球官网
    【学习】手写数字生成
  • 原文地址:https://blog.csdn.net/raoxiaoya/article/details/125991670