• go-sync-mutex


    Sync

    ​ Go 语言作为一个原生支持用户态进程(Goroutine)的语言,当提到并发编程、多线程编程时,往往都离不开锁这一概念。锁是一种并发编程中的同步原语(Synchronization Primitives),它能保证多个 Goroutine 在访问同一片内存时不会出现竞争条件(Race condition)等问题。

    通过atomic.CompareAndSwapInt32调用汇编CAS(compare and swap)指令的原子性来实现临界区的互斥访问,保证只有一个协程获取到锁

    ​ 当其中一个 goroutine 获得了这个锁,其他 goroutine 尝试获取这个锁时将会被阻塞,直到持有锁的 goroutine 释放锁为止。

    ​ Go 语言在 sync 包中提供了用于同步的一些基本原语,包括常见的 sync.Mutexsync.RWMutexsync.WaitGroupsync.Oncesync.Cond

    !Mutex互斥锁

    ​ Go 语言的 sync.Mutex 由两个字段 statesema 组成。其中 state 表示当前互斥锁的状态,而 sema 是用于控制锁状态的信号量。

    type Mutex struct {
        state int32
        sema uint32		// 指针地址 0xF,存着结构体的地址
    }
    
    • 1
    • 2
    • 3
    • 4

    Mutex.state

    状态字段

    int32类型的state代表:

    • locked: 锁状态 1被锁 0未被锁

    • woken:1是否有goroutine模式被唤醒,0未被唤醒

    • starving:1进入饥饿模式,0正常模式

    • 其他位:代表获取锁的等待队列中的协程数,state是int32类型,说明是32bit,其余位是32-3 bits,所以最大排队协程数就是2^(32-3)

    锁模式

    • 正常模式:队头和新协程的抢占,未抢占到的扔到队尾
    • 饥饿模式:按顺序获取锁,不得插队,防止队尾一直阻塞等待
    正常模式

    在正常模式下获取锁:

    1. 多线程下竞争锁,获取成功返回,修改sync.Mutex结构体字段。获取失败,自旋等待其他线程释放锁,4次之后仍然拿不到锁,goroutine加入到等待队列尾部,状态改成_GWaiting
    2. 获取到锁的线程释放锁,从等待队列头部唤醒一个Goroutine,状态改成_Grunning,他会和新创建并且获取锁的新goroutine(M正在运行的g_Grunning)争抢锁。
      1. 如果被唤醒的G仍然未能抢到锁,goroutine加入到等待队列头部,状态改成_GWaiting
      2. 如果被唤醒的G抢到锁,新创建的G相当于重新进入1步骤
    饥饿模式

    在饥饿模式下获取锁:

    互斥锁会直接交给等待队列最前面的 Goroutine。新的 Goroutine 在该状态下不能获取锁、也不会进入自旋状态,它们只会在队列的末尾等待。如果一个 Goroutine 获得了互斥锁并且它在队列的末尾或者它等待的时间少于 1ms,那么当前的互斥锁就会切换回正常模式。

    锁模型切换
    • 正常模式切换到饥饿模式:被唤醒的 Goroutine 超过 1ms 没有获取到锁,它就会将当前互斥锁切换饥饿模式,防止部分 Goroutine 被『饿死』。
    • 饥饿模式换到正常模式切:
      • 一个 Goroutine 获得了互斥锁并且它在队列的末尾,说明没有协程在竞争了,切换到正常模式
      • 被唤醒的 Goroutine 获得锁没超过 1ms ,切换到正常模式

    Mutex.Sema

    控制锁状态的信号量(互斥信号量)

    // runtime/sema.go
    type semaRoot struct {
    	lock  mutex
    	treap *sudog // 锁抢占者的 平衡树的根
    	nwait uint32 // 抢占锁的goroutine的数量
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6

    互斥锁加锁/解锁

    • func (m *Mutex) Lock():Lock方法锁住m,如果m已经加锁,则阻塞直到m解锁。
    func (m *Mutex) Lock() {
    	// 未锁状态,获取锁return
    	if atomic.CompareAndSwapInt32(&m.state, 0, mutexLocked) {
    		if race.Enabled {
    			race.Acquire(unsafe.Pointer(m))
    		}
    		return
    	}
    	// Slow path (outlined so that the fast path can be inlined)
    	m.lockSlow()
    }
    
    func (m *Mutex) lockSlow() {
    	var waitStartTime int64	// 协程抢占锁时间,时间超出,锁变成饥饿模式
    	starving := false
    	awoke := false
    	iter := 0
    	old := m.state
    	for {
    		// 锁住状态下 and 不是饥模式 and 在可自旋次数下 进入
    		if old&(mutexLocked|mutexStarving) == mutexLocked && runtime_canSpin(iter) {
    			// awoke标记是false and 锁非唤醒状态 and 锁的等待者大于0  
                // 满足这些条件,把锁变成唤醒状态
                // awoke flag标记成true
    			if !awoke && old&mutexWoken == 0 && old>>mutexWaiterShift != 0 &&
    				atomic.CompareAndSwapInt32(&m.state, old, old|mutexWoken) {
    				awoke = true
    			}
                // 自旋 汇编
    			runtime_doSpin()
                // 累计自选次数
    			iter++
                
                // 把唤醒状态 覆盖 old
    			old = m.state
               
    			continue
    		}
            
            // 可能其他协程更改了锁状态:改成了`未锁住状态` 
            // 以下操作就有AB两种情况
            // A情况: 锁住状态 且 饥饿模式 (自旋次数超过4次)
            // B情况: 未锁住
            
            //拿到最新锁状态
    		new := old
    
    		// old不是饥饿模式(排除A情况),那是B情况,把new设置成锁状态
    		if old&mutexStarving == 0 {
    			new |= mutexLocked
    		}
            
            // old 是 锁住状态 或 是饥饿模式。
            // 等待数+1 (当前协程加入等待)
    		if old&(mutexLocked|mutexStarving) != 0 {
    			new += 1 << mutexWaiterShift
    		}
            
            // 饥饿标识非空 and old是锁住状态。 (第一次进入 且 A情况)
            // new设置成饥饿状态
    		if starving && old&mutexLocked != 0 {
    			new |= mutexStarving
    		}
            
            // awoke标识是 唤醒状态
    		if awoke {
    			// new不是唤醒状态,锁标识不对,panic
    			if new&mutexWoken == 0 {
    				throw("sync: inconsistent mutex state")
    			}
                // &^ 想异的位保留,相同的位清0。 非唤醒状态 变成 唤醒, 唤醒状态下变成非唤醒
    			new &^= mutexWoken
    		}
            
            // 此时new的3个字段状态 : 锁住,饥饿,唤醒状态未知
            // 如果状态没有被其他协程改变,状态更改成new
    		if atomic.CompareAndSwapInt32(&m.state, old, new) {
                // 如果状态是非锁住 and 非饥饿模式 
                // compareAndSwapInt32已经改成锁住,break for
    			if old&(mutexLocked|mutexStarving) == 0 {
    				break // locked the mutex with CAS
    			}
    			
                // 设置排队者的开始等待时间
    			queueLifo := waitStartTime != 0
    			if waitStartTime == 0 {
    				waitStartTime = runtime_nanotime()
    			}
                
                // 信号量设置,阻塞等待(信号量的P操作,协程间通信)
    			runtime_SemacquireMutex(&m.sema, queueLifo, 1)
                
                // 标记 饥饿标识, 如果是饥饿标识是true 或者 大于饥饿阈值 
    			starving = starving || runtime_nanotime()-waitStartTime > starvationThresholdNs
                
                // 获取最新锁状态,虽然前面compareAndSwap已经改成了m.state : 锁住,饥饿,唤醒状态未知。但是前面阻塞有可能其他协程更改了状态
    			old = m.state
                
                // 锁是饥饿模式
    			if old&mutexStarving != 0 {
    				
                    // 锁是 锁住状态 或者 唤醒状态 或者 等待者为0个时
                    // 抛出
    				if old&(mutexLocked|mutexWoken) != 0 || old>>mutexWaiterShift == 0 {
    					throw("sync: inconsistent mutex state")
    				}
                    
                    // 
    				delta := int32(mutexLocked - 1<<mutexWaiterShift)
                    
                    // 非贪婪模式 或则 等待者为1时
    				if !starving || old>>mutexWaiterShift == 1 {
    					delta -= mutexStarving
    				}
    				atomic.AddInt32(&m.state, delta)
    				break
    			}
    			awoke = true
    			iter = 0
    		} else {
    			old = m.state
    		}
    	}
    
    	if race.Enabled {
    		race.Acquire(unsafe.Pointer(m))
    	}
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89
    • 90
    • 91
    • 92
    • 93
    • 94
    • 95
    • 96
    • 97
    • 98
    • 99
    • 100
    • 101
    • 102
    • 103
    • 104
    • 105
    • 106
    • 107
    • 108
    • 109
    • 110
    • 111
    • 112
    • 113
    • 114
    • 115
    • 116
    • 117
    • 118
    • 119
    • 120
    • 121
    • 122
    • 123
    • 124
    • 125
    • 126
    • 127
    • 128
    • func (m *Mutex) Unlock():Unlock方法解锁m,如果m未加锁会导致运行时错误。锁和线程无关,可以由不同的线程加锁和解锁。
    func (m *Mutex) Unlock() {
    	if race.Enabled {
    		_ = m.state
    		race.Release(unsafe.Pointer(m))
    	}
    
    	// Fast path: drop lock bit.
    	new := atomic.AddInt32(&m.state, -mutexLocked)
    	if new != 0 {
    		// Outlined slow path to allow inlining the fast path.
    		// To hide unlockSlow during tracing we skip one extra frame when tracing GoUnblock.
    		m.unlockSlow(new)
    	}
    }
    
    func (m *Mutex) unlockSlow(new int32) {
    	if (new+mutexLocked)&mutexLocked == 0 {
    		throw("sync: unlock of unlocked mutex")
    	}
    	if new&mutexStarving == 0 {
    		old := new
    		for {
    			if old>>mutexWaiterShift == 0 || old&(mutexLocked|mutexWoken|mutexStarving) != 0 {
    				return
    			}
    			// Grab the right to wake someone.
    			new = (old - 1<<mutexWaiterShift) | mutexWoken
    			if atomic.CompareAndSwapInt32(&m.state, old, new) {
    				runtime_Semrelease(&m.sema, false, 1)
    				return
    			}
    			old = m.state
    		}
    	} else {
    		// 信号量中的V操作
    		runtime_Semrelease(&m.sema, true, 1)
    	}
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39

    信号量:信号量有两种原子操作,他们必须成对出现
    P操作:信号量 减1,当信号量 <0 ,表明资源被占用,进程阻塞。 当信号量>=0,表明资源被释放(可用),进程可继续执行
    V操作:信号量加1,当信号量<=0时,代表有阻塞中进程。当信号量>0,表明没有阻塞中进程,无需操作
    互斥信号量,默认值为1
    ————————————————
    版权声明:本文为CSDN博主「我是你的小阿磊」的原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接及本声明。
    原文链接:https://blog.csdn.net/qiu18610714529/article/details/109062176

    example

    import "sync"
    
    func main() {
    
    	m := sync.Mutex{}
    	go user1(&m)
    	go user2(&m)
    
    	signalChan := make(chan os.Signal, 1)
    	signal.Notify(signalChan, os.Interrupt)
    	select {
    	case <-signalChan:
    		fmt.Println("catch interrupt signal")
    		break
    	}
    }
    
    func printer(str string, m *sync.Mutex) {
    	m.Lock()         //加锁
    	defer m.Unlock() //解锁
    	for _, ch := range str {
    		fmt.Printf("%c", ch)
    		time.Sleep(time.Millisecond * 1)
    	}
    }
    func user1(m *sync.Mutex) {
    	printer("hello ", m)
    }
    func user2(m *sync.Mutex) {
    	printer("world", m)
    }
    
    //打印结果
    worldhello 或者 helloworld: 两个单词是有序的,不像`heworllldo`两个协程同时打印,说明某个协程会在mutex.Lock()进行自旋等待获取锁
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34

    RWMutex读写互斥锁

    读写互斥锁 sync.RWMutex 是细粒度的互斥锁,它不限制资源的并发读,但是读写、写写操作无法并行执行。

    type RWMutex struct {
    	w           Mutex  // held if there are pending writers
    	writerSem   uint32 // semaphore for writers to wait for completing readers
    	readerSem   uint32 // semaphore for readers to wait for completing writers
    	readerCount int32  // number of pending readers
    	readerWait  int32  // number of departing readers
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • w — 复用互斥锁提供的能力;
    • writerSemreaderSem — 分别用于写等待读和读等待写:
    • readerCount 存储了当前正在执行的读操作数量;
    • readerWait 表示当写操作被阻塞时等待的读操作个数;

    加锁/解锁

    • func (rw *RWMutex) RLock() :读加锁,如果有写锁,则阻塞等待

      func (rw *RWMutex) RLock() {
      	if race.Enabled {
      		_ = rw.w.state
      		race.Disable()
      	}
      	if atomic.AddInt32(&rw.readerCount, 1) < 0 {
      		// 阻塞,等待信号量的v操作释放共享内存,才能获得执行权
      		runtime_SemacquireMutex(&rw.readerSem, false, 0)
      	}
      	if race.Enabled {
      		race.Enable()
      		race.Acquire(unsafe.Pointer(&rw.readerSem))
      	}
      }
      
      • 1
      • 2
      • 3
      • 4
      • 5
      • 6
      • 7
      • 8
      • 9
      • 10
      • 11
      • 12
      • 13
      • 14
    • func (rw *RWMutex) RUnlock():解读锁,

      func (rw *RWMutex) RUnlock() {
      	if race.Enabled {
      		_ = rw.w.state
      		race.ReleaseMerge(unsafe.Pointer(&rw.writerSem))
      		race.Disable()
      	}
      	if r := atomic.AddInt32(&rw.readerCount, -1); r < 0 {
      		// Outlined slow-path to allow the fast-path to be inlined
      		rw.rUnlockSlow(r)
      	}
      	if race.Enabled {
      		race.Enable()
      	}
      }
      
      func (rw *RWMutex) rUnlockSlow(r int32) {
      	if r+1 == 0 || r+1 == -rwmutexMaxReaders {
      		race.Enable()
      		throw("sync: RUnlock of unlocked RWMutex")
      	}
      	// A writer is pending.
      	if atomic.AddInt32(&rw.readerWait, -1) == 0 {
      		// The last reader unblocks the writer.
      		runtime_Semrelease(&rw.writerSem, false, 1)
      	}
      }
      
      • 1
      • 2
      • 3
      • 4
      • 5
      • 6
      • 7
      • 8
      • 9
      • 10
      • 11
      • 12
      • 13
      • 14
      • 15
      • 16
      • 17
      • 18
      • 19
      • 20
      • 21
      • 22
      • 23
      • 24
      • 25
      • 26
    • func (rw *RWMutex) Lock(): 写锁,如果有读写锁被占用,阻塞等待所有读写锁释放后才能获得

      • 其他 Goroutine 在获取写锁时会进入自旋或者休眠
      • 有其他 Goroutine 持有互斥锁的读锁该 Goroutine 会调用 runtime.sync_runtime_SemacquireMutex 进入休眠状态等待所有读锁所有者执行结束后释放 writerSem 信号量将当前协程唤醒;
    func (rw *RWMutex) Lock() {
    	if race.Enabled {
    		_ = rw.w.state
    		race.Disable()
    	}
    	// First, resolve competition with other writers.
    	rw.w.Lock()
    	// Announce to readers there is a pending writer.
    	r := atomic.AddInt32(&rw.readerCount, -rwmutexMaxReaders) + rwmutexMaxReaders
    	// Wait for active readers.
    	if r != 0 && atomic.AddInt32(&rw.readerWait, r) != 0 {
    		runtime_SemacquireMutex(&rw.writerSem, false, 0)
    	}
    	if race.Enabled {
    		race.Enable()
    		race.Acquire(unsafe.Pointer(&rw.readerSem))
    		race.Acquire(unsafe.Pointer(&rw.writerSem))
    	}
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19

    example

    func RMutex() {
    	ch := make(chan struct{})
    	rw := &sync.RWMutex{}
    	go func() {
    		rw.RLock()
    		time.Sleep(time.Second * 5)
    		defer rw.RUnlock()
    		fmt.Println("fun1")
    	}()
    
    	go func() {
    		time.Sleep(time.Millisecond * 500)
    		rw.Lock()
    		defer rw.Unlock()
    		fmt.Println("fun2")
    		close(ch)
    	}()
    
    	<-ch
    }
    
    // 先打印出fun1 再打印fun2 代表了读写互斥
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
  • 相关阅读:
    201.回溯算法:全排列(力扣)
    数据校验(初级篇)
    剑指 Offer II 024. 反转链表
    网络安全(黑客)自学
    微服务和注册中心
    Linux配置strongSwan
    前端框架的发展史可以追溯到早期的静态网页时代
    【Matlab笔记_18】函数处理不同输出变量
    Leetcode刷题方法总结---字符串全解
    Vue-62、Vue技术路由守卫
  • 原文地址:https://blog.csdn.net/qiu18610714529/article/details/134233502