go中的并发安全锁oncemap

在Go代码中可能会存在多个 goroutine同时操作一个资源(临界区),这种情况会发生 竞态问题(数据竞态)。类比现实生活中的例子有十字路口被各个方向的的汽车竞争;还有火车上的卫生间被车厢里的人竞争。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
var x int64
var wg sync.WaitGroup

func add() {
	for i := 0; i < 50000; i++ {
		x = x + 1
	}
	wg.Done()
}
func main() {
	wg.Add(2)
	go add()
	go add()
	wg.Wait()
	fmt.Println(x)
}

上面的代码中我们开启了两个 goroutine去累加变量x的值,这两个 goroutine在访问和修改 x变量的时候就会存在数据竞争,导致最后的结果与期待的不符(小于十万)。

互斥锁

互斥锁是一种常用的控制共享资源访问的方法,它能够保证同时只有一个 goroutine可以访问共享资源。Go语言中使用 sync包的 Mutex类型来实现互斥锁。 使用互斥锁来修复上面代码的问题:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
var x int64
var wg sync.WaitGroup
var lock sync.Mutex

func add() {
	for i := 0; i < 5000; i++ {
		lock.Lock() // 加锁
		x = x + 1
		lock.Unlock() // 解锁
	}
	wg.Done()
}
func main() {
	wg.Add(2)
	go add()
	go add()
	wg.Wait()
	fmt.Println(x)
}

使用互斥锁能够保证同一时间有且只有一个 goroutine进入临界区,其他的 goroutine则在等待锁;当互斥锁释放后,等待的 goroutine才可以获取锁进入临界区,多个 goroutine同时等待一个锁时,唤醒的策略是随机的。

Mutex是一个结构体类型,如果要作为参数时一定要使用指针成为引用传递,不然传递的就是副本,是另一个锁了

读写互斥锁

互斥锁的本质是当一个goroutine访问的时候,其他goroutine都不能访问。这样在资源同步,避免竞争的同时也降低了程序的并发性能。程序由原来的并行执行变成了串行执行。

互斥锁是完全互斥的,但是有很多实际的场景下是读多写少的,当我们并发的去读取一个资源不涉及资源修改的时候是没有必要加互斥锁的,这种场景下使用读写锁是更好的一种选择。读写锁在Go语言中使用 sync包中的 RWMutex类型。(读的时候如果什么锁都不加,可能读取的资源是修改前的资源)

**读写锁可以让多个读操作并发,同时读取,但是对于写操作是完全互斥的。**也就是说,当一个goroutine进行写操作的时候,其他goroutine既不能进行读操作,也不能进行写操作。当一个goroutine进行读操作的时候,其他goroutine也可以进行读操作,但是任何一个goroutine都不能进行写操作。处于读锁定状态,那么针对它的写锁定操作将永远不会成功,且相应的goroutine也会被一直阻塞,直到读锁解锁。因为它们是互斥的。

读写锁示例:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
var (
	x      int64
	wg     sync.WaitGroup
	lock   sync.Mutex
	rwlock sync.RWMutex
)

func write() {
	// lock.Lock()   // 加互斥锁
	rwlock.Lock() // 加写锁
	x = x + 1
	time.Sleep(10 * time.Millisecond) // 假设读操作耗时10毫秒
	rwlock.Unlock()                   // 解写锁
	// lock.Unlock()                     // 解互斥锁
	wg.Done()
}

func read() {
	// lock.Lock()                  // 加互斥锁
	rwlock.RLock()               // 加读锁
	time.Sleep(time.Millisecond) // 假设读操作耗时1毫秒
	rwlock.RUnlock()             // 解读锁
	// lock.Unlock()                // 解互斥锁
	wg.Done()
}

func main() {
	start := time.Now()
	for i := 0; i < 10; i++ {
		wg.Add(1)
		go write()
	}

	for i := 0; i < 1000; i++ {
		wg.Add(1)
		go read()
	}

	wg.Wait()
	end := time.Now()
	fmt.Println(end.Sub(start))
}

需要注意的是读写锁非常适合读多写少的场景,如果读和写的操作差别不大,读写锁的优势就发挥不出来。

sync.WaitGroup

在代码中生硬的使用 time.Sleep肯定是不合适的,Go语言中可以使用 sync.WaitGroup来实现并发任务的同步。 sync.WaitGroup有以下几个方法:

image.png

sync.WaitGroup内部维护着一个计数器,计数器的值可以增加和减少。例如当我们启动了N 个并发任务时,就将计数器值增加N。每个任务完成时通过调用Done()方法将计数器减1。通过调用Wait()来等待并发任务执行完,当计数器值为0时,表示所有并发任务已经完成。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
var wg sync.WaitGroup

func hello() {
	defer wg.Done()
	fmt.Println("Hello Goroutine!")
}
func main() {
	wg.Add(1)
	go hello() // 启动另外一个goroutine去执行hello函数
	wg.Wait()  //等待子协程完成才继续运行下面的代码
	fmt.Println("main goroutine done!")
}

需要注意 sync.WaitGroup是一个结构体,传递的时候要传递指针。

sync.Once

在编程的很多场景下我们需要确保某些操作在高并发的场景下只执行一次,例如只加载一次配置文件、只关闭一次通道等。

Go语言中的 sync包中提供了一个针对只执行一次场景的解决方案–sync.Once

sync.Once只有一个 Do方法,其签名如下:

1
func (o *Once) Do(f func()) {}

备注:如果要执行的函数 f需要传递参数就需要搭配闭包来使用。因为once方法使用是十分苛刻的,它的参数只能是无参函数,因此如果要用在关闭一次通道时,就需要用 func (){close(ch1)}

加载配置文件示例

延迟一个开销很大的初始化操作到真正用到它的时候再执行是一个很好的实践。因为预先初始化一个变量(比如在init函数中完成初始化)会增加程序的启动耗时,而且有可能实际执行过程中这个变量没有用上,那么这个初始化操作就不是必须要做的。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
var icons map[string]image.Image

func loadIcons() {
	icons = map[string]image.Image{
		"left":  loadIcon("left.png"),
		"up":    loadIcon("up.png"),
		"right": loadIcon("right.png"),
		"down":  loadIcon("down.png"),
	}
}

// Icon 被多个goroutine调用时不是并发安全的
// 可能同时有多个goroutine都判断其为nil,都进行了一次初始化操作
func Icon(name string) image.Image {
	if icons == nil {
		loadIcons()
	}
	return icons[name]
}

多个 goroutine并发调用Icon函数时不是并发安全的,现代的编译器和CPU可能会在保证每个 goroutine都满足串行一致的基础上自由地重排访问内存的顺序。loadIcons函数可能会被重排为以下结果:

1
2
3
4
5
6
7
func loadIcons() {
	icons = make(map[string]image.Image)
	icons["left"] = loadIcon("left.png")
	icons["up"] = loadIcon("up.png")
	icons["right"] = loadIcon("right.png")
	icons["down"] = loadIcon("down.png")
}

在这种情况下就会出现即使判断了 icons不是nil也不意味着变量初始化完成了(第一个goroutine刚进行完make操作第二个goroutine就进行了判断,此时icons不是nil,但是它并没有进行赋值)。考虑到这种情况,我们能想到的办法就是添加互斥锁,保证初始化 icons的时候不会被其他的 goroutine操作,但是这样做又会引发性能问题。因此可以使用once来只加一次锁。

once是一个结构体,它有两个成员,一个互斥锁和一个标志位,当第一个goroutine执行once时,标志位为false,那么它就对其加互斥锁,此时其他goroutine只能等待第一个goroutine解锁,once执行完后标志位变为true并解锁,然后后面的goroutine进来判断,此时标志位是true,就不会执行once了。这就实现了多个协程只运行一次once

使用 sync.Once改造的示例代码如下:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
var icons map[string]image.Image

var loadIconsOnce sync.Once

func loadIcons() {
	icons = map[string]image.Image{
		"left":  loadIcon("left.png"),
		"up":    loadIcon("up.png"),
		"right": loadIcon("right.png"),
		"down":  loadIcon("down.png"),
	}
}

// Icon 是并发安全的
func Icon(name string) image.Image {
	loadIconsOnce.Do(loadIcons)
	return icons[name]
}

sync.Map

Go语言中内置的map不是并发安全的。开启少量几个 goroutine进行并发读写的时候可能没什么问题,当并发多了之后(超过20次后)执行上面的代码就会报 fatal error: concurrent map writes错误。

像这种场景下就需要为map加锁来保证并发的安全性了,Go语言的 sync包中提供了一个开箱即用的并发安全版map–sync.Map。**开箱即用表示不用像内置的map一样使用make函数初始化就能直接使用。**同时 sync.Map内置了诸如 StoreLoadLoadOrStoreDeleteRange等操作方法。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
var m = sync.Map{}

func main() {
	wg := sync.WaitGroup{}
	for i := 0; i < 20; i++ {
		wg.Add(1)
		go func(n int) {
			key := strconv.Itoa(n)
			m.Store(key, n)
			value, _ := m.Load(key)
			fmt.Printf("k=:%v,v:=%v\n", key, value)
			wg.Done()
		}(i)
	}
	wg.Wait()
}

原子操作

代码中的加锁操作因为涉及内核态的上下文切换会比较耗时、代价比较高。针对基本数据类型我们还可以使用原子操作来保证并发安全,因为原子操作是Go语言提供的方法,它在用户态就可以完成,**因此性能比加锁操作更好。**Go语言中原子操作由内置的标准库 sync/atomic提供。

atomic包

image.png

比较并交换操作是比较addr和old的值是否相等,如果相等返回true,并将new的值赋值给addr

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
var x int64
var wg sync.WaitGroup

func main() {
	wg.Add(10000)
	for i := 0; i < 10000; i++ {
		go func() {
			atomic.AddInt64(&x, 1)
			wg.Done()
		}()
	}
	wg.Wait()
	fmt.Println(x)
}

atomic包提供了底层的原子级内存操作,对于同步算法的实现很有用。这些函数必须谨慎地保证正确使用。除了某些特殊的底层应用,使用通道或者sync包的函数/类型实现同步更好。

Licensed under CC BY-NC-SA 4.0