编写时间 2020.10 月,当时技术有限深度还是不足,文章仅提供思路

共享内存和信号量

共享内存是进程间通信最简单的方式之一,允许两个或更多进程访问同一块内存,我们可以通过给不同进程返回指向同一个物理内存区域的指针来做到这一点, 当一个进程改变了这块地址中的内容的时候,其他进程都会察觉到这个更改。

但是使用共享内存的时候,若一个进程正在向共享内存写入数据,则在它完成这一步操作之前,别的进程都不应该去读写这些数据,否则会造成数据覆盖或者难以预料的错误。

为了防止多进程竞争共享资源而造成的数据错乱,需要一个保护机制使得共享的资源在任意时刻只能被一个进程访问,正好信号量就实现了这一保护机制。

信号量其实是一个整型的计数器,主要用于实现进程间的互斥与同步,而不是用于缓存进程间通信的数据

信号量表示资源的数量,控制信号量的方式有两种原子操作:

  • P 操作:这个操作会把信号量 -1,相减后如果信号量 < 0,则表明资源已被占用,进程需堵塞等待,如果信号量 ≥ 0,代表还有资源可用,进程可继续正常运行
  • V 操作,这个操作会把信号量加上 1,相加后如果信号量 ≤ 0,则表明当前有阻塞中的进程,于是会将该进程唤醒运行;相加后如果信号量 > 0,则表明当前没有阻塞中的进程

P 操作是在进入共享资源之前,V 操作是在离开共享资源之后,这两个操作必须同时出现

Mutex初级版本源码解读

我从 Github 上能拉到最早的带有 Mutex 的相关代码为 Go 1.4,加上注释也才 109 行,非常小巧

使用示例

mutex := &sync.Mutex{}
mutex.Lock()
// ..do something
mutex.Unlock()

和 Slice 一样,Go 中的 Mutex 本质上还是一个结构体,而且零值 Mutex 为未上锁状态,Mutex一旦被使用禁止被拷贝

其中 Mutex 中的 state 字段代表当前锁的状态,而 sema 代表控制锁的信号量

type Mutex struct {
	state int32
	sema  uint32
}

const (
	mutexLocked = 1 << iota // mutex is locked
	mutexWoken
	mutexWaiterShift = iota
)

state 是一个 32 位的整型字段

  • 最低位代表 Locked 状态,0 表示未上锁,1 表示已上锁
  • 倒数第二位 Woken 状态,0 表示未唤醒,1 表示已唤醒
type Locker interface {
	Lock()
	Unlock()
}

func (m *Mutex) Lock() {
	// Fast path: grab unlocked mutex.
	if atomic.CompareAndSwapInt32(&m.state, 0, mutexLocked) {
		if raceenabled {
			raceAcquire(unsafe.Pointer(m))
		}
		return
	}

	awoke := false
	for {
		old := m.state
		new := old | mutexLocked
		if old&mutexLocked != 0 {
			new = old + 1<<mutexWaiterShift
		}
		if awoke {
			new &^= mutexWoken
		}
		if atomic.CompareAndSwapInt32(&m.state, old, new) {
			if old&mutexLocked == 0 {
				break
			}
			runtime_Semacquire(&m.sema)
			awoke = true
		}
	}

	if raceenabled {
		raceAcquire(unsafe.Pointer(m))
	}
}

func (m *Mutex) Unlock() {
	if raceenabled {
		_ = m.state
		raceRelease(unsafe.Pointer(m))
	}

	// Fast path: drop lock bit.
	new := atomic.AddInt32(&m.state, -mutexLocked)
	if (new+mutexLocked)&mutexLocked == 0 {
		panic("sync: unlock of unlocked mutex")
	}

	old := new
	for {
		// If there are no waiters or a goroutine has already
		// been woken or grabbed the lock, no need to wake anyone.
		if old>>mutexWaiterShift == 0 || old&(mutexLocked|mutexWoken) != 0 {
			return
		}
		// Grab the right to wake someone.
		new = (old - 1<<mutexWaiterShift) | mutexWoken
		if atomic.CompareAndSwapInt32(&m.state, old, new) {
			runtime_Semrelease(&m.sema)
			return
		}
		old = m.state
	}
}

上锁的流程分为 fast-pathslow-path

先通过调用 CompareAndSwapInt32 来竞争更新 m.state,成功则获得锁,如果失败则进入 slow-path 阶段

CompareAndSwapInt32 是属于 atomic 的函数,也是一种原子操作,函数原型为

*func CompareAndSwapInt32(addr int32, old, new int32) (swapped bool)

第一个参数的值应该是指向被操作值的指针值,后两个参数代表被操作值的旧值和新值,函数被调用之后会先判断参数 addr 指向的被操作值和参数 old 的值是否相等,仅当此判断得到肯定的结果之后,该函数才会用参数 new 代表的新值替换掉原先的旧值,否则后面的替换操作都会被忽略

所以我们能知道 Mutex 是互斥排他锁且 不可重入 ,当我们在 goroutine 获取同一把锁会导致 deadlock 的情况出现

下面的 raceenabled 是 Go 的一个编译特性,用来标识是否启用数据竞争检测,先跳过这一段,当第一步 CAS 上锁失败后,表明 Mutex 已经被上锁,那么 stategoroutine 等待数加一,更新当前的锁状态,并将其放入队列等待被唤醒,等到另一个 g 调用了 Unlock 方法之后,当前 g 被唤醒,然后设置 awoken=true ,再执行一遍循环后,此时 locked 位就是未上锁状态,同时 new 代表上锁,然后清除 woken 位,然后再 CAS 更新 newstate 上,因为之前是未上锁状态,那么就代表抢锁成功,直接返回。