前言
在高并发场景下,势必会涉及到对公共资源的竞争。当对应场景发生时,我们经常会使用 mutex 的 Lock() 和 Unlock() 方法来占有或释放资源,互斥锁就是并发控制的一个基本手段,是为了避免竞争而建立的一种并发控制机制。简单来说就是控制并发
源码包
源码地址
mutex 的源码主要是在 src/sync/mutex.go文件里
基础定义
// 锁实例
type Mutex struct {
state int32 // state记录用来记录加锁状态的
sema uint32 // sema是信号量,一个非负数的全局变量,通过一个计数器来控制对共享资源的访问次数限制
}
// 实现加解锁2接口
type Locker interface {
Lock()
Unlock()
}
// 锁的状态定义与及参数
const (
mutexLocked = 1 << iota // 0001 已加锁
mutexWoken // 0010 是否有routine唤醒
mutexStarving // 0100 当前处于饥饿模式
mutexWaiterShift = iota // 3等待者数量的标志,最多可以阻塞2^29个goroutine
starvationThresholdNs = 1e6 // 1e6ns = 100000ns = 1ms ,既 如果当前goroutine在1ms内没有获取得到锁,判定为饥饿
)
加锁
/*
Lock
@Desc 加锁
*/
func (m *Mutex) Lock() {
// CAS加锁,如果state=0,直接将state设置为mutexLocked,表示加锁成功
if atomic.CompareAndSwapInt32(&m.state, 0, mutexLocked) {
if race.Enabled {
race.Acquire(unsafe.Pointer(m))
}
return
}
// 上面加锁失败,就走lockSlow
m.lockSlow()
}
/*
TryLock
@Desc 尝试加锁并返回结果
*/
func (m *Mutex) TryLock() bool {
// 锁定或者饥饿
old := m.state
if old&(mutexLocked|mutexStarving) != 0 {
return false
}
// 尝试去获得锁
if !atomic.CompareAndSwapInt32(&m.state, old, old|mutexLocked) {
return false
}
if race.Enabled {
race.Acquire(unsafe.Pointer(m))
}
return true
}
/*
lockSlow
@Desc 慢加锁
*/
func (m *Mutex) lockSlow() {
var waitStartTime int64 // 等待开始时间
starving := false // 是否饥饿
awoke := false // 是否唤醒
iter := 0 // 自旋次数
old := m.state // 旧状态
for {
// 第一个条件是state已被锁,但是不是饥饿状态。如果时饥饿状态,自旋时没有用的,锁的拥有权直接交给了等待队列的第一个。
// 第二个条件是还可以自旋,多核、压力不大并且在一定次数内可以自旋, 具体的条件可以参考`sync_runtime_canSpin`的实现。
// 如果满足这两个条件,不断自旋来等待锁被释放、或者进入饥饿状态、或者不能再自旋。
if old&(mutexLocked|mutexStarving) == mutexLocked && runtime_canSpin(iter) {
// 主动旋转很有意义,尝试设置mutexWoken标志。
// 以防止Unlock时唤醒其他阻塞的goroutine。
// 当前协程处于未醒状态 且 mutex也处于未唤醒状态且等待队列有等待协程
// 尝试将自己变成那个唤醒着的协程
// 尝试将mutex的状态设为唤醒 若成立 则将当前协程设置为唤醒状态(awoke变量=true)
// old>>mutexWaiterShift 取当前等待数量
if !awoke && old&mutexWoken == 0 && old>>mutexWaiterShift != 0 &&
atomic.CompareAndSwapInt32(&m.state, old, old|mutexWoken) {
awoke = true
}
// 主动自旋,调用一定数量的pause指令,cpu不会执行任何操作 但是消耗cpu时间
runtime_doSpin()
// 循环次数++
iter++
// 记录旧状态
old = m.state
continue
}
// 以下代码 试图修改锁状态
// 到了这一步, state的状态可能是:
// 1. 锁还没有被释放,锁处于正常状态
// 2. 锁还没有被释放, 锁处于饥饿状态
// 3. 锁已经被释放, 锁处于正常状态
// 4. 锁已经被释放, 锁处于饥饿状态
new := old
// 不要去尝试获得饥饿的锁,新的到达的goroutines必须排队。
// 非饥饿,尝试CAS获得锁
if old&mutexStarving == 0 {
new |= mutexLocked
}
// 若旧状态处于锁定或者饥饿,新状态排队数量加1
if old&(mutexLocked|mutexStarving) != 0 {
new += 1 << mutexWaiterShift
}
// 如果starving=true并且old是上锁的,那么将new设置为饥饿状态
// starving=true 这里说明唤醒后抢锁失败又循环到这里了,并且被判断为饥饿
if starving && old&mutexLocked != 0 {
new |= mutexStarving
}
// goroutine已从睡眠中唤醒,设置new设置为非唤醒状态
if awoke {
// goroutine已经从睡眠中醒来,
// 所以我们需要在任何情况下重置标志。
if new&mutexWoken == 0 {
throw("sync: inconsistent mutex state")
}
new &^= mutexWoken
}
// 尝试去获得锁
// 尝试更新state状态
// 注意new的锁标记不一定是true, 也可能只是标记一下锁的state是饥饿状态.
if atomic.CompareAndSwapInt32(&m.state, old, new) {
// 如果之前不是饥饿状态,并且不是加锁状态
// 那么代表当前goroutine已经通过CAS成功获取了锁
// 直接break
if old&(mutexLocked|mutexStarving) == 0 {
break
}
// 到这里,表示未能上锁成功
// 如果开始等待时间!=0,则表示已经在等待队伍中,那就排在队伍前面。
queueLifo := waitStartTime != 0
// 第一次来waitStartTime=0,排在队尾
if waitStartTime == 0 {
waitStartTime = runtime_nanotime()
}
// 获取信号量 有可能陷入阻塞等待,把这个goroutine加入等待队列
// runtime_SemacquireMutex中的lifo为true,则将等待服务程序放在等待队列的开头。
runtime_SemacquireMutex(&m.sema, queueLifo, 1)
// 这个goroutine被唤醒
// 如果当前饥饿||超过饥饿设定时间,将goroutine状态设置成饥饿
starving = starving || runtime_nanotime()-waitStartTime > starvationThresholdNs
old = m.state
// 如果当前是饥饿状态
if old&mutexStarving != 0 {
// 如果goroutine是唤醒或者饥饿
// 锁的所有权转移给当前goroutine,但是锁的状态还是处于不一致,mutexLocked没有设置
// 仍然认为是waiter,这个状态需要修复
if old&(mutexLocked|mutexWoken) != 0 || old>>mutexWaiterShift == 0 {
throw("sync: inconsistent mutex state")
}
// 当前goroutine获取锁,waiter数量-1
delta := int32(mutexLocked - 1<<mutexWaiterShift)
// 如果不是饥饿,获得当前goroutine是队列中最后一个goroutine
// 退出饥饿模式,并把状态设置成正常
if !starving || old>>mutexWaiterShift == 1 {
// 退出饥饿模式。
// 关键是在这里做,并考虑等待时间。
// 饥饿模式是如此低效,两个goroutines
// 当它们切换互斥对象时,可以无限地进行锁步进
// 退出饥饿模式
delta -= mutexStarving
}
//更新原值
atomic.AddInt32(&m.state, delta)
break
}
//不是饥饿模式,那么将唤醒设置为true,自旋次数设置为0,重新开始抢锁
awoke = true
iter = 0
} else {
// 如果CAS不成功,也就是说没能成功获得锁,锁被别的goroutine获得了或者锁一直没被释放
// 那么就更新状态,重新开始循环尝试拿锁
old = m.state
}
}
if race.Enabled {
race.Acquire(unsafe.Pointer(m))
}
}
解锁
/*
Unlock
@Desc 删除锁
*/
func (m *Mutex) Unlock() {
// 如果没有上过锁,却调用此方法解锁,将会抛出运行时错误。
// 它将允许在不同的 Goroutine 上进行上锁解锁
if race.Enabled {
_ = m.state
race.Release(unsafe.Pointer(m))
}
// 尝试解锁
new := atomic.AddInt32(&m.state, -mutexLocked)
if new != 0 {
// 慢解锁
m.unlockSlow(new)
}
}
/*
unlockSlow
@Desc 尝试删除锁
*/
func (m *Mutex) unlockSlow(new int32) {
if (new+mutexLocked)&mutexLocked == 0 {
throw("sync: unlock of unlocked mutex")
}
if new&mutexStarving == 0 {
old := new
for {
// 非饥饿模式,没有需要唤醒的队列
if old>>mutexWaiterShift == 0 || old&(mutexLocked|mutexWoken|mutexStarving) != 0 {
return
}
// 唤醒等待队列并且数量-1
new = (old - 1<<mutexWaiterShift) | mutexWoken
if atomic.CompareAndSwapInt32(&m.state, old, new) {
runtime_Semrelease(&m.sema, false, 1)
return
}
old = m.state
}
} else {
// 饥饿模式:直接将所有权交给队列头的那个goroutine
runtime_Semrelease(&m.sema, true, 1)
}
}
参考文献
- 参考1:https://developer.aliyun.com/article/786406
- 参考2:https://zhuanlan.zhihu.com/p/413922019
- 参考3:https://www.cnblogs.com/ricklz/p/14535653.html
- 参考4:https://www.purewhite.io/2019/03/28/golang-mutex-source/
|