最新消息:XAMPP默认安装之后是很不安全的,我们只需要点击左方菜单的 "安全"选项,按照向导操作即可完成安全设置。

互斥锁Mutex实现

XAMPP新闻 admin 391浏览 0评论

mutex是什么

Mutex即我们常说的互斥锁,也称为排他锁。使用互斥锁,可以限定临界区只能同时有一个goroutine持有。当临界区已经有一个goroutine持有的时候,其他goroutine如果想进入此临界区,会等待或者返回失败。直到持有的goroutine退出临界区,等待goroutine中的某一个才有机会进入临界区执行代码。

对于如下代码,同时开启两个goroutine执行c.add操作,假设运行运行的机器是多核的,那这两个goroutine很可能是并行执行的,但是在执行c.count++时是串行的,只有获取到锁的goroutine才会执行,另一个会等待。

drf29

package main

import (
 "fmt"
 "sync"
)

type counter struct {
 count int
 sync.Mutex
}

func (c *counter) add() {
 c.Lock()
 defer c.Unlock()

 c.count++
}

func (c *counter) value() int {
 c.Lock()
 defer c.Unlock()

 return c.count
}

func main() {
 var wg sync.WaitGroup
 var c counter

 wg.Add(2)

 // goroutine 1
 go func() {
  defer wg.Done()

  for i := 0; i < 5000; i++ {
   c.add()
  }
 }()

 // goroutine 2
 go func() {
  defer wg.Done()

  for i := 0; i < 5000; i++ {
   c.add()
  }
 }()

 wg.Wait()

 fmt.Println(c.value())
}

mutex数据结构

Mutex结构定义如下,它由state和sema两个字段组成,state表示当前互斥锁的状态,sema是用来控制锁状态的信号量。

type Mutex struct {
 state int32
 sema  uint32
}

const (
 // state的第一个bit位,表示是否加锁
 mutexLocked = 1 << iota // mutex is locked
 // state的第二个bit位,表示是否被唤醒
 mutexWoken
 // state的第三个bit位,表示是否处于饥饿模式
 mutexStarving
 // state的[4,32]bit位,表示等待锁的goroutine的数量
 mutexWaiterShift = iota
 // 饥饿时间1毫秒
 starvationThresholdNs = 1e6
)

state是一个int32整数,它的最低3个bit分别表示是否已经加锁、是否唤醒、是否处于饥饿状态,剩余bit位的数值表示有多少个goroutine在等待,如下图所示。

drf029

mutex实现原理

为了保证锁的公平性,mutex有两种模式:正常模式和饥饿模式。正常模式下所有等待锁的goroutine按照队列的先进先出顺序等待。被唤醒的goroutine并不是直接拥有锁,而是与新请求锁的goroutine竞争。为啥要让新请求锁的goroutine也来竞争锁,而不是直接放到队列尾部呢?因为这看起来很公平,但从性能角度上来看,并不最优的。如果我们把锁交给正在占用CPU时间片的goroutine,这样就不需要做上下文切换,在高并发的情况下可能有更好的性能。

将唤醒的goroutine和新来请求锁的goroutine竞争锁,很可能导致唤醒的goroutine在竞争锁的时候失败,即新来的goroutine抢到了锁。唤醒的goroutine又被重新加入到队列的队头。那如果极端情况下会出现唤醒的goroutine一致抢不到锁,所以为了处理这种情况,设置了饥饿模式,如果唤醒goroutine等待超过1毫秒没有获取到锁,将会进入饥饿模式。

饥饿模式将锁的所有权直接从释放锁的goroutine移交给等待队列中的第一个goroutine。新来请求锁的goroutine不会尝试获取锁,即使锁看起来处于解锁状态,也不会进行自旋,而是直接放到队尾。

什么时候又从饥饿模式切换到正常模式呢?如果一个等待中的goroutine获得了锁并且满足下面两个条件之一,会从饥饿模式切换到正常模式。

  1. 当前的goroutine是队列中最后一个goroutine
  2. 当前的goroutine等待时间小于1ms

下面结合源码(Go1.14版本)看Mutex的实现细节。先来看加锁处理逻辑,实现如下。处理两个步骤:

  1. 如果是空锁状态,即当前还没有任何goroutine进行加锁,则直接cas将state设置为加锁状态,
  2. 否则走lockSlow逻辑。

步骤1也就是happy path,处理简单的情景,复杂的逻辑封装到一个单独的含失踪,对应到这里的步骤2. 这样步骤1可以内联,Go源码中很多地方都用到了这种处理方式,值得我们学习使用。

func (m *Mutex) Lock() {
 // happy path, 还没有人获取锁,即只有当前程序在获取锁,可以直接获取到
 // 通过CAS操作设置m.state值,从0修改为1
 if atomic.CompareAndSwapInt32(&m.state, 0, mutexLocked) {
  if race.Enabled {
   race.Acquire(unsafe.Pointer(m))
  }
  return
 }
 m.lockSlow()
}

lockSlow处理逻辑很复杂,有太多的状态需要判断。整个核心处理就是一个for{}大循环,内部有很多处理细节。这里我们for循环中的逻辑分为3个逻辑块,方便理解,这三个逻辑块分别是自旋处理、计算目标状态值和设置目标状态。另外我们关注循环出口地方只有两处,其他地方都会继续执行循环。

自旋处理

自旋就是忙等待空转,让线程在某段时间内一直保持执行,从而避免线程上下文切换带来的开销。所以对于线程阻塞很短时间的场景是非常合适的。循环开始首先检查是否可以进行自旋的条件。条件1:锁已经被锁定 条件2:锁不处于饥饿模式,即处于正常模式 条件3:满足自旋条件runtime_canSpin

只有上述条件都满足才会开始自旋,自旋处理在runtime_doSpin

func (m *Mutex) lockSlow() {
 // 记录goroutine等待的时间
 var waitStartTime int64
 // 标记当前的goroutine是否处于饥饿状态
 starving := false
 // goroutine是否被唤醒
 awoke := false
 // 自旋的次数
 iter := 0
 old := m.state
 for {
  // 满足这里的3个条件进行自旋:
  // 1. 互斥锁已经被锁定,即有goroutine正在占用锁
  // 2. 互斥锁当前不处于饥饿模式
  // 3. 满足尝试自旋的条件runtime_canSpin
  if old&(mutexLocked|mutexStarving) == mutexLocked && runtime_canSpin(iter) {
   // 走到这里,说明当前的互斥锁处于正常模式,如果当前互斥锁还没有被唤醒,则标记为唤醒状态
   // 唤醒的goroutine就是当前的goroutine. 通过CAS操作,将互斥锁更新为唤醒状态
   if !awoke && old&mutexWoken == 0 && old>>mutexWaiterShift != 0 &&
    atomic.CompareAndSwapInt32(&m.state, old, old|mutexWoken) {
    awoke = true
   }
   // 进行自旋
   runtime_doSpin()
   // 自旋次数+1
   iter++
   // 再次获取锁的状态
   old = m.state
   continue
  }
   ...
 }

 if race.Enabled {
  race.Acquire(unsafe.Pointer(m))
 }
}

我们来看runtime_canSpin可以进行自旋的条件,该函数实现在runtime包中的proc.go文件,实现函数为下面的sync_runtime_canSpin函数。根据实现总结出以下情况会终止自旋:

  1. 已经自旋执行了多次,具体执行自旋超过4次会停止
  2. 单核CPU也不会自旋,在单核CPU下因为没有其他goroutine运行,持有锁的goroutine没有运行,当前抢锁的goroutine自旋也是不抢到的
  3. 没有其他正在运行的P
  4. 当前P的G队列为空
    上述情况1避免长时间自旋浪费CPU的情况,情况2和3用来保证除了当前在运行的Goroutine之外,还有其他Goroutine在运行。情况4避免自旋锁等待的条件是由当前P的其他G来触发,这样会导致自旋变得没有意义,因为条件永远无法触发。
func sync_runtime_canSpin(i int) bool {
  // sync_runtime_canSpin函数中在以下四种情况返回false

  // 1. 已经执行了很多次
  // 2. 是单核CPU
  // 3. 没有其他正在运行的P
  // 4. 当前P的G队列为空
 // active_spin为4
 if i >= active_spin || ncpu <= 1 || gomaxprocs <= int32(sched.npidle+sched.nmspinning)+1 {
  return false
 }
 if p := getg().m.p.ptr(); !runqempty(p) {
  return false
 }
 return true
}

接着来看执行自旋的逻辑runtime_doSpin,runtime_doSpin实现也在runtime/proc.go文件,它调用了procyield函数,procyield是汇编实现的,处理逻辑是调用PAUSE指令30次,通过该指令消耗CPU时间。

func sync_runtime_doSpin() {
 // active_spin_cnt为30
 procyield(active_spin_cnt)
}

TEXT runtime·procyield(SB),NOSPLIT,$0-0
    MOVL    cycles+0(FP), AX
again:
    PAUSE
    SUBL    $1, AX
    JNZ again
    RET
计算目标状态值

互斥锁经过特殊的自旋逻辑处理后,会根据上下文计算当前互斥量的最新状态new。根据不同的条件更新状态字段mutexlocked、mutexstarting、mutexwoken和mutexwaitershift中存储的不同信息。

func (m *Mutex) lockSlow() {
 ...
 for {
   ...
  // 走到这里有四种情况:
  // 1. 当前互斥锁处于正常模式,并且锁还没有被释放
  // 2. 当前互斥锁处于饥饿模式,并且锁还没有被释放
  // 3. 当前互斥锁处于正常模式,并且锁已经被释放
  // 4. 当前互斥锁处于饥饿模式,并且锁已经被释放
  new := old
 
  // 之前处于非饥饿状态,新状态进行加锁
  if old&mutexStarving == 0 {
   new |= mutexLocked
  }
  // 之前处于加锁状态或是饥饿状态,当前的goroutine需要等待
  // 之前处于加锁状态,则当前的goroutine还是不会获取到锁的,所以waiter+1
  // 之前是饥饿状态,则当前的goroutine不会设置新状态锁,因为锁被转移给队列中的第一个goroutine.
  if old&(mutexLocked|mutexStarving) != 0 {
   new += 1 << mutexWaiterShift
  }
  
  // 如果当前的goroutine正处于饥饿模式并且旧状态中锁还是处于锁定状态
  // 则将新状态标记为饥饿模式
  if starving && old&mutexLocked != 0 {
   new |= mutexStarving
  }
  // 如果当前的goroutine已经设置为唤醒状态,需要清除新状态中的唤醒标记,因为走到这里,
  // 当前的goroutine要么拿到锁,要么会进入休眠状态,反正不在是唤醒状态
  if awoke {
   if new&mutexWoken == 0 {
    throw("sync: inconsistent mutex state")
   }
   // 当前的goroutine已经唤醒了,重置清理掉new中的唤醒标志
   new &^= mutexWoken
  }
   ...
 }

 if race.Enabled {
  race.Acquire(unsafe.Pointer(m))
 }
}

设置目标状态

计算最新的目标值new后,通过CAS更新state的值。如果更新成功,则旧状态未加锁,且锁不处于饥饿状态,说明当前goroutine竞争成功,获得锁返回。这就是为什么goroutine在正常模式下竞争时更有可能获得锁的原因。如果当前goroutine竞争失败,则使用runtime_SemacquireMutex来保证资源不被两个goroutines获取。runtime_SemacquireMutex 实现在runtime包中的sema.go文件。它是semacquire1的简单封装,里面最后会调用goPark让当前goroutine让出执行权限,同时设置当前goroutine为睡眠状态,即不参与调度。休眠之后某个时刻锁被释放此goroutine被唤醒,计算它是否处处于饥饿状态,如果锁已经处于饥饿状态,抢到锁返回。

func (m *Mutex) lockSlow() {
    ...
 for {
  ...
  // 成功设置为新状态
  if atomic.CompareAndSwapInt32(&m.state, old, new) {
   // 原来锁的状态已经释放并且不是处于饥饿状态,当前的goroutine请求到了锁的所有权直接返回
   if old&(mutexLocked|mutexStarving) == 0 {
    break // locked the mutex with CAS
   }
   
   // queueLifo表示当前的goroutine以前是否在队列里面,queueLifo为true表示以前就在队列里面
   queueLifo := waitStartTime != 0
   if waitStartTime == 0 {
    // 当前的goroutine以前不在队列里面,将当前的时间保存起来
    waitStartTime = runtime_nanotime()
   }
   // 休眠当前goroutine等待,如果是新的goroutine,它的queueLifo为false,会加入到
   // 等待队列的尾部 ;如果是唤醒的goroutine,它的queueLifo为true,会加入到等待队列的队头
   runtime_SemacquireMutex(&m.sema, queueLifo, 1)
   // 休眠之后被唤醒,计算它是否处处于饥饿状态
   starving = starving || runtime_nanotime()-waitStartTime > starvationThresholdNs
   old = m.state
   // 如果锁已经处于饥饿状态,抢到锁返回
   if old&mutexStarving != 0 {
    if old&(mutexLocked|mutexWoken) != 0 || old>>mutexWaiterShift == 0 {
     throw("sync: inconsistent mutex state")
    }
    // delta为7,二级制为0111,此时old二进制最低3位为100,所以old-delta,
    // 相当于waiter减1并加锁
    delta := int32(mutexLocked - 1<<mutexWaiterShift)
    // 如果没有处于饥饿模式或者是最后一个waiter,需要将锁设置为正常模式
    if !starving || old>>mutexWaiterShift == 1 {
     delta -= mutexStarving
    }
    // 设置state为新状态,因为已经获得了锁,直接break返回
    atomic.AddInt32(&m.state, delta)
    break
   }
   awoke = true
   iter = 0
  } else {
   old = m.state
  }
 }

 if race.Enabled {
  race.Acquire(unsafe.Pointer(m))
 }
}

上面分析完了加锁的处理流程,现在我们一起来看解锁处理流程。Unlock会先检查最简单的情况,持有锁的只有1个goroutine,没有其他goroutine抢锁。这种情况直接修改state的值进行解锁。其他情况走unlockSlow逻辑。如果锁处于饥饿模式,直接唤醒等待队列中的goroutine.如果锁处于正常状态,如果没有waiter,或者已经有在处理的情况了,那么释放就好,不用做额外的处理。否则,waiter数减 1,并设置唤醒标志,通过CAS操作更新state的值,然后执行runtime_Semrelease唤醒一个goroutine返回。

func (m *Mutex) Unlock() {
 if race.Enabled {
  _ = m.state
  race.Release(unsafe.Pointer(m))
 }

 // 解锁
 new := atomic.AddInt32(&m.state, -mutexLocked)
 // 如果new为0,说明只有当前的执行体持有锁,不用做其他的处理
 // 如果new非0,需要执行其他操作,尝试唤醒等待者
 if new != 0 {
  m.unlockSlow(new)
 }
}

func (m *Mutex) unlockSlow(new int32) {
 if (new+mutexLocked)&mutexLocked == 0 {
  throw("sync: unlock of unlocked mutex")
 }
 // 没有处于饥饿模式,即处于正常模式
 if new&mutexStarving == 0 {
  old := new
  for {
   // 下面四个条件满足任一一个就直接返回,不做唤醒其他goroutine处理:
   // 1. 没有waiter了,所以也就没有唤醒的对象了
   // 2. 锁处于锁定状态,表明被其他goroutine抢到了
   // 3. 锁处于唤醒状态,表明有其他等待的goroutine被唤醒
   // 4. 锁处于饥饿模式下,将会解锁队列头的goroutine
   if old>>mutexWaiterShift == 0 || old&(mutexLocked|mutexWoken|mutexStarving) != 0 {
    return
   }
   // 将等待的waiter数减一,并设置为唤醒状态
   new = (old - 1<<mutexWaiterShift) | mutexWoken
   // 原子操作设置m.state为新状态new
   if atomic.CompareAndSwapInt32(&m.state, old, new) {
    // 唤醒一个waiter然后返回
    runtime_Semrelease(&m.sema, false, 1)
    return
   }
   // 执行CAS操作失败会走到这里,操作失败的原因是锁的状态已经被别的goroutine改变了
   // 这里更新状态后执行下一个循环
   old = m.state
  }
 } else {
  // 处于饥饿状态,直接唤醒队列头部的goroutine
  runtime_Semrelease(&m.sema, true, 1)
 }
}

总结

Mutex实现使用了CAS+自旋操作+信号量技术,通过正常模式和饥饿模式兼顾公平和性能。在正常模式下,主打性能,被唤醒的goroutine并不是直接拥有锁,而是与新请求锁的goroutine竞争。把锁交给正在占用CPU时间片的goroutine,这样就不需要做上下文切换。在饥饿模式下,保证等待时间最久的goroutine在锁被释放时优先执行,保证goroutine不会因等锁而饿死。

转载请注明:XAMPP中文组官网 » 互斥锁Mutex实现

您必须 登录 才能发表评论!