什么是并发安全

并发情况下,多个线程或协程会同时操作同一个资源,例如变量、数据结构、文件等。如果不保证并发安全,就可能导致数据竞争、脏读、脏写、死锁、活锁、饥饿等一系列并发问题,产生重大的安全隐患,比如12306抢到同一张火车票、多个用户抢到只剩一件库存的商品。而并发安全就是为了避免这些问题。Golang 中有一些原则和工具来保证并发安全,例如:

  1. 遵循“通过通信来共享内存,而不是通过共享内存通信”的理念,尽量使用 channel 来传递数据,而不是使用共享变量。
  2. 如果必须使用共享变量,那么要使用合理的锁来避免数据竞争。
  3. 如果使用锁,要注意锁的粒度和范围,尽量减少锁的持有时间和影响范围,避免死锁和活锁。

关于更为详细的并发安全性:可以参考:理解Golang 赋值的并发安全性

资源竞争

所有资源竞争就是多个 goroutine 访问某个共享的资源,我们来看一个资源竞争的例子:

var wg sync.WaitGroup

func add(count *int) {
	defer wg.Done()
	for i := 0; i < 10000; i++ {
		*count = *count + 1
	}
}

func main() {
	count := 0
	wg.Add(3)
	for i := 0; i < 3; i++ {
		go add(&count)
	}

	wg.Wait()
	fmt.Println(count)
}

该程序的每一次执行结果都不同, 就是因为协程之间出现了资源竞争,在读取更新 count 这个过程中,被其他协程横插了一脚,改变了 count 的值,没有保证原子性。下面我们通过互斥锁来锁住在读取更新过程的 count 的值,来使 count 的值打印正确。

互斥锁和读写互斥锁

sync 包提供了通过 sync.Mutexsync.RWMutex 来实现互斥锁和读写互斥锁。

sync 互斥锁(sync.Mutex)是一种最简单的锁类型,当一个 goroutine 获得了资源后,其他 goroutine 就只能等待这个 goroutine 释放该资源。互斥锁可以保证对共享资源的原子访问,避免并发冲突。

sync 读写互斥锁(sync.RWMutex)是一种更复杂的锁类型,它允许多个 goroutine 同时获取读锁,但只允许一个 goroutine 获取写锁。读写互斥锁适用于读多写少的场景下,它比互斥锁更高效。

sync.Mutex

sync.Mutex 使用 Lock() 加锁,Unlock() 解锁,如果对未解锁的 Mutex 使用 Lock() 会阻塞当前程序运行,我们来看加入了互斥锁后的程序:

var wg sync.WaitGroup
var l sync.Mutex

func add(count *int) {
	defer wg.Done()
	l.Lock() // 锁住 count 资源,阻塞程序运行,直到 Unlock
	for i := 0; i < 10000; i++ {
		*count = *count + 1
	}
	l.Unlock()
}

func main() {
	count := 0
	wg.Add(3)
	for i := 0; i < 3; i++ {
		go add(&count)
	}

	wg.Wait()
	fmt.Println(count)
}

sync.RWMutex

  1. RWMutex 是单写多读锁,该锁可以加多个读锁或者一个写锁。
  2. 读锁占用的情况下会阻止写,不会阻止读,多个 goroutine 可以同时获取资源,使用 RLockRUnlock 加锁解锁。
  3. 写锁会阻止其他 goroutine 进来,读写不论,整个锁住的资源由该 goroutine 独占,使用 LockUnlock 加锁解锁。
  4. 应该只在频繁读取,少量写入的情况下使用读写互斥锁
var m sync.RWMutex
var i = 0

func main() {
	go write()
	go write()
	go read()
	go read()
	go read()
	time.Sleep(2 * time.Second)
}

func read() {
	fmt.Println(i, "我准备获取读锁了")
	m.RLock()
	fmt.Println(i, "我要开始读数据了,所有写数据的都需要等待1s")
	time.Sleep(1 * time.Second)
	m.RUnlock()
	fmt.Println(i, "我已经释放了读锁,可以继续写数据了")
}

func write() {
	fmt.Println(i, "我准备获取写锁了")
	m.Lock()
	fmt.Println(i, "我要开始写数据了,所有人都需要等待1s")
	time.Sleep(1 * time.Second)
	i++
	m.Unlock()
	fmt.Println(i, "我已经释放了写锁,你们可以继续了")
}

// 结果
0 我准备获取读锁了
0 我要开始读数据了,所有写数据的都需要等待1s
0 我准备获取读锁了
0 我要开始读数据了,所有写数据的都需要等待1s
0 我准备获取读锁了
0 我要开始读数据了,所有写数据的都需要等待1s
0 我准备获取写锁了
0 我准备获取写锁了
0 我已经释放了读锁,可以继续写数据了
0 我已经释放了读锁,可以继续写数据了
0 我已经释放了读锁,可以继续写数据了
0 我要开始写数据了,所有人都需要等待1s
1 我已经释放了写锁,你们可以继续了

读写互斥锁有点难以理解,但是只要记住读写互斥永远是互斥的,就理解了大半。为了应对读锁长久占用,导致写锁迟迟不能更新数据,导致并发饥饿问题,所以在 Golang 的读写互斥锁中,写锁比读锁优先级更高。

sync.once

sync.once 是一个极为强大的功能,它可以确保一个函数只能被执行一次。通常做来在并发执行前初始化一次的共享资源。

func main() {
	once := &sync.Once{}
	for i := 0; i < 10; i++ {
		go func(i int) {
			once.Do(func() {
				fmt.Printf("i的值 %d\n", i)
			})
		}(i)
	}

	time.Sleep(1 * time.Second)
}

这段代码始终只会打印一次 i 的值。

原子操作

为了实现变量值的并发情况下安全赋值,除了互斥锁外,Golang 还提供了 atomic 包,他能保证在变量在读写时不受其他 goroutine 影响。atomic 是通过 CPU 指令在硬件层面上实现的,比互斥锁性能更好。当然,互斥锁一般来说是对代码块的并发控制,atomic 是对某个变量的并发控制,二者侧重点不同。另外,atomic 是一个很底层的包,除非在一些非常追求的性能的地方,否则其他地方都不推荐使用。

atomic.Add

add 方法比较容易理解,就是对一个值进行增加操作:

func AddInt32(addr *int32, delta int32) (new int32)
func AddInt64(addr *int64, delta int64) (new int64)
func AddUint32(addr *uint32, delta uint32) (new uint32)
func AddUint64(addr *uint64, delta uint64) (new uint64)
func AddUintptr(addr *uintptr, delta uintptr) (new uintptr)

使用示例:

var a int32 = 1
atomic.AddInt32(&a, 2)
fmt.Println(a)          // 输出3
atomic.AddInt32(&a, -1) // delta 是负值的话会减少该值
fmt.Println(a)          // 输出2

atomic.CompareAndSwap

CompareAndSwap用作比较置换值,如果等于,则更新值,返回 true,否则返回 false:

func CompareAndSwapInt32(addr *int32, old, new int32) (swapped bool)
func CompareAndSwapInt64(addr *int64, old, new int64) (swapped bool)
func CompareAndSwapPointer(addr *unsafe.Pointer, old, new unsafe.Pointer) (swapped bool)
func CompareAndSwapUint32(addr *uint32, old, new uint32) (swapped bool)
func CompareAndSwapUint64(addr *uint64, old, new uint64) (swapped bool)
func CompareAndSwapUintptr(addr *uintptr, old, new uintptr) (swapped bool)

使用示例:

var (
	a int32 = 1
	b bool
)
b = atomic.CompareAndSwapInt32(&a, 1, 2)
fmt.Println(a) // 输出2
fmt.Println(b) // 输出true
b = atomic.CompareAndSwapInt32(&a, 1, 3)
fmt.Println(a) // 输出2
fmt.Println(b) // 输出false

atomic.Swap

Swap方法不比较,直接置换值:

func SwapInt32(addr *int32, new int32) (old int32)
func SwapInt64(addr *int64, new int64) (old int64)
func SwapPointer(addr *unsafe.Pointer, new unsafe.Pointer) (old unsafe.Pointer)
func SwapUint32(addr *uint32, new uint32) (old uint32)
func SwapUint64(addr *uint64, new uint64) (old uint64)
func SwapUintptr(addr *uintptr, new uintptr) (old uintptr)

使用示例:

var (
	a   int32 = 1
	old int32
)
old = atomic.SwapInt32(&a, 2)
fmt.Println(a)   // 输出2
fmt.Println(old) // 输出1

atomic.Load

Load 用来读取值:

func LoadInt32(addr *int32) (val int32)
func LoadInt64(addr *int64) (val int64)
func LoadPointer(addr *unsafe.Pointer) (val unsafe.Pointer)
func LoadUint32(addr *uint32) (val uint32)
func LoadUint64(addr *uint64) (val uint64)
func LoadUintptr(addr *uintptr) (val uintptr)

使用示例:

var (
	a     int32 = 1
	value int32
)
value = atomic.LoadInt32(&a)
fmt.Println(value) // 输出1

atomic.Store

Store 用来将一个值存到变量中,Load 不会读取到存到一半的值:

func StoreInt32(addr *int32, val int32)
func StoreInt64(addr *int64, val int64)
func StorePointer(addr *unsafe.Pointer, val unsafe.Pointer)
func StoreUint32(addr *uint32, val uint32)
func StoreUint64(addr *uint64, val uint64)
func StoreUintptr(addr *uintptr, val uintptr)

使用示例:

var a int32
atomic.StoreInt32(&a, 1)
fmt.Println(a) // 输出1

atomic.Value

Value 实现了对任意值的存储、读取、置换、比较置换:

func (v *Value) Store(val any)
func (v *Value) Load() (val any)
func (v *Value) Swap(new any) (old any)
func (v *Value) CompareAndSwap(old, new any)

使用示例:

var v atomic.Value
v.Store(1)
fmt.Println(v.Load()) // 1
v.Swap(2)
fmt.Println(v.Load()) // 2
b := v.CompareAndSwap(2, 3)
fmt.Println(v.Load()) // 3
fmt.Println(b)

使用Swap置换值时,必须要保持原有的数据类型,否则就会 panic: sync/atomic: swap of inconsistently typed value into Value [recovered]。

需要注意的是,atomic.value 对于复杂的数据结构不能保证原子操作,如切片、映射等。

sync.map

go 在并发下,同时读 map 是安全的,但是读写 map 会引发竞争,导致 panic: fatal error: concurrent map read and map write。

// 创建一个map
m := make(map[int]int)
// 开启两个协程不停的对map写入数据
go func() {
	for {
		m[1] = 1
	}
}()
go func() {
	for {
		_ = m[1]
	}
}()
for {
}

// 结果
fatal error: concurrent map read and map write

为了解决这个问题,可以在写 map 之前加入锁:

l := sync.Mutex{}

l.Lock()
m[1] = 1
l.Unlock()

这样处理程序上运行是没问题了,但是性能并不高。go 在 1.9 版本中加入了效率较高的并发安全:sync.map:

func (m *Map) Store(key, value any) // 储存一个数据
func (m *Map) Load(key any) (value any, ok bool) // 读取一个数据
func (m *Map) Delete(key any) // 删除一个数据
func (m *Map) Range(f func(key, value any) bool) // 遍历数据

实例:

var smap sync.Map
// 保存数据
smap.Store("shanghai", 40000)
smap.Store("nanjing", 10000)
smap.Store("wuhan", 20000)
smap.Store("shenzhen", 30000)
// 读取值
if v, ok := smap.Load("nanjing"); ok {
	fmt.Printf("键名:%s,值:%v\n", "nanjing", v)
}
// 删除
smap.Delete("wuhan")
if v, ok := smap.Load("wuhan"); !ok {
	fmt.Printf("键名:%s,值:%v\n", "wuhan", v)
}
// 遍历数据
smap.Range(func(k, v interface{}) bool {
	fmt.Printf("键名:%s,值:%v\n", k, v)
	return true
})

// 结果
键名:nanjing,值:10000
键名:wuhan,值:<nil>
键名:shenzhen,值:30000
键名:shanghai,值:40000
键名:nanjing,值:10000

sync.map 并没有获取长度的方法,只能在遍历的时候自行计算。

本系列文章:

  1. Go 并发编程 - Goroutine 基础 (一)
  2. Go 并发编程 - 并发安全(二)
  3. Go 并发编程 - runtime 协程调度(三)
11-01 03:30