快速了解go-sync包相关知识
1. sync.map
1.1 基本使用
Go 语言原生 map 并不是线程安全的,对它进行并发读写操作的时候,需要加锁。而 sync.map
则是一种并发安全的 map,在 Go 1.9 引入
package main
import (
"fmt"
"sync"
)
func main() {
var m sync.Map
// 1. 写入
m.Store("qcrao", 18)
m.Store("stefno", 20)
// 2. 读取
age, _ := m.Load("qcrao")
fmt.Println(age.(int))
// 3. 遍历
m.Range(func(key, value interface{}) bool {
name := key.(string)
age := value.(int)
fmt.Println(name, age)
return true
})
// 4. 删除
m.Delete("qcrao")
age, ok := m.Load("qcrao")
fmt.Println(age, ok)
// 5. 读取或写入
m.LoadOrStore("stefno", 100)
age, _ = m.Load("stefno")
fmt.Println(age)
}
1.2 源码分析
摘抄自:https://go.cyub.vip/concurrency/sync-map.html
sync.Map的结构:
type Map struct {
mu Mutex // 排他锁,用于对dirty map操作时候加锁处理
read atomic.Value // read map
// dirty map。新增key时候,只写入dirty map中,需要使用mu
dirty map[interface{}]*entry
// 用来记录从read map中读取key时miss的次数
misses int
}
sync.Map结构体中read字段是atomic.Value
类型,底层是readOnly结构体:
type readOnly struct {
m map[interface{}]*entry
amended bool // 当amended为true时候,表示sync.Map中的key也存在dirty map中
}
read map和dirty map的value类型是*entry, entry结构体定义如下:
// expunged用来标记从dirty map删除掉了
var expunged = unsafe.Pointer(new(interface{}))
type entry struct {
// 如果p == nil 说明对应的entry已经被删除掉了, 且m.dirty == nil
// 如果 p == expunged 说明对应的entry已经被删除了,但m.dirty != nil,且该entry不存在m.dirty中
// 上述两种情况外,entry则是合法的值并且在m.read.m[key]中存在
// 如果m.dirty != nil,entry也会在m.dirty[key]中
// p指针指向sync.Map中key对应的Value
p unsafe.Pointer // *interface{}
}
对Map的操作可以分为四类:
- Add key-value 新增key-value
- Update key-value 更新key对应的value值
- Get Key-value 获取Key对应的Value值
- Delete Key 删除key
我们来看看新增和更新操作:
// Store用来新增和更新操作
func (m *Map) Store(key, value interface{}) {
read, _ := m.read.Load().(readOnly)
// 如果read map存在该key,且该key对应的value不是expunged时(准确的说key对应的value, value是*entry类型,entry的p字段指向不是expunged时),
// 则使用cas更新value,此操作是原子性的
if e, ok := read.m[key]; ok && e.tryStore(&value) {
return
}
m.mu.Lock() // 先加锁,然后重新读取一次read map,目的是防止dirty map升级到read map(并发Load操作时候),read map更改了。
read, _ = m.read.Load().(readOnly)
if e, ok := read.m[key]; ok { // 若read map存在此key,此时就是map的更新操作
if e.unexpungeLocked() { // 将value由expunged更改成nil,
// 若成功则表明dirty map中不存在此key,把key-value添加到dirty map中
m.dirty[key] = e
}
e.storeLocked(&value) // 更改value。value是指针类型(*entry),read map和dirty map的value都指向该值。
} else if e, ok := m.dirty[key]; ok {// 若dirty map存在该key,则直接更改value
e.storeLocked(&value)
} else { // 若read map和dirty map中都不存在该key,其实就是map的新增key-value操作
if !read.amended {// amended为true时表示sync.Map部分key存在dirty map中
// dirtyLocked()做两件事情:
// 1. 若dirty map等于nil,则初始化dirty map。
// 2. 遍历read map,将read map中的key-value复制到dirty map中,从read map中复制的key-value时,value是nil或expunged的(因为nil和expunged是key删除了的)不进行复制。
// 同时若value值为nil,则顺便更改成expunged(用来标记dirty map不包含此key)
// 思考🤔:为啥dirtyLocked()要干事情2,即将read map的key-value复制到dirty map中?
m.dirtyLocked()
// 该新增key-value将添加dirty map中,所以将read map的amended设置为true。当amended为true时候,从sync.Map读取key时候,优先从read map中读取,若read map读取时候不到时候,会从dirty map中读取
m.read.Store(readOnly{m: read.m, amended: true})
}
// 添加key-value到dirty map中
m.dirty[key] = newEntry(value)
}
// 释放锁
m.mu.Unlock()
}
func (e *entry) tryStore(i *interface{}) bool {
for {
p := atomic.LoadPointer(&e.p)
if p == expunged {
return false
}
if atomic.CompareAndSwapPointer(&e.p, p, unsafe.Pointer(i)) {
return true
}
}
}
func (e *entry) unexpungeLocked() (wasExpunged bool) {
return atomic.CompareAndSwapPointer(&e.p, expunged, nil)
}
func (e *entry) storeLocked(i *interface{}) {
atomic.StorePointer(&e.p, unsafe.Pointer(i))
}
func (m *Map) dirtyLocked() {
if m.dirty != nil {
return
}
read, _ := m.read.Load().(readOnly)
m.dirty = make(map[interface{}]*entry, len(read.m))
for k, e := range read.m {
if !e.tryExpungeLocked() {
m.dirty[k] = e
}
}
}
func (e *entry) tryExpungeLocked() (isExpunged bool) {
p := atomic.LoadPointer(&e.p)
for p == nil {
if atomic.CompareAndSwapPointer(&e.p, nil, expunged) {
return true
}
p = atomic.LoadPointer(&e.p)
}
return p == expunged
}
接下来看看Map的Get操作:
// Load方法用来获取key对应的value值,返回的ok表名key是否存在sync.Map中
func (m *Map) Load(key interface{}) (value interface{}, ok bool) {
read, _ := m.read.Load().(readOnly)
e, ok := read.m[key]
if !ok && read.amended { // 若key不存在read map中,且dirty map包含sync.Map中key情况下
m.mu.Lock() // 加锁
read, _ = m.read.Load().(readOnly) // 再次从read map读取key
e, ok = read.m[key]
if !ok && read.amended {
e, ok = m.dirty[key] // 从dirty map中读取key
// missLocked() 首先将misses计数加1,misses用来表明read map读取key没有命中的次数。
// 若misses次数多于dirty map中元素个数时候,则将dirty map升级为read map,dirty map设置为nil, amended置为false
m.missLocked()
}
m.mu.Unlock()
}
if !ok { // read map 和 dirty map中都不存在该key
return nil, false
}
// 加载value值
return e.load()
}
func (e *entry) load() (value interface{}, ok bool) {
p := atomic.LoadPointer(&e.p)
if p == nil || p == expunged { // 若value值是nil或expunged,返回nil, false,表示key不存在
return nil, false
}
return *(*interface{})(p), true
}
func (m *Map) missLocked() {
m.misses++
if m.misses < len(m.dirty) {
return
}
// 新创建一个readOnly对象,其中amended为false, 并将m.dirty直接赋值给该对象的m字段,
// 这也是上面思考中的dirtyLocked为什么要干事情2的原因,因为通过2操作之后,m.dirty已包含read map中的所有key,可以直接拿来创建readOnly。
m.read.Store(readOnly{m: m.dirty})
m.dirty = nil
m.misses = 0
}
在接着看看Map的删除操作:
// Delete用于删除key
func (m *Map) Delete(key interface{}) {
read, _ := m.read.Load().(readOnly)
e, ok := read.m[key]
if !ok && read.amended {
m.mu.Lock()
read, _ = m.read.Load().(readOnly)
e, ok = read.m[key]
// 若read map不存在该key,但dirty map中存在该key。则直接调用delete,删除dirty map中该key
if !ok && read.amended {
delete(m.dirty, key)
}
m.mu.Unlock()
}
if ok {
e.delete()
}
}
func (e *entry) delete() (hadValue bool) {
for {
p := atomic.LoadPointer(&e.p)
if p == nil || p == expunged { // 若entry中p已经是nil或者expunged则直接返回
return false
}
if atomic.CompareAndSwapPointer(&e.p, p, nil) { // 将entry中的p设置为nil
return true
}
}
}
sync.Map还提供遍历key-value功能:
// Range方法接受一个迭代回调函数,用来处理遍历的key和value
func (m *Map) Range(f func(key, value interface{}) bool) {
read, _ := m.read.Load().(readOnly)
if read.amended { // 若dirty map中包含sync.Map中key时候
m.mu.Lock()
read, _ = m.read.Load().(readOnly)
if read.amended {// 加锁之后,再次判断,是为了防止并发调用Load方法时候,dirty map升级为read map时候,amended为false情况
// read.amended为true的时候,m.dirty包含sync.Map中所有的key
read = readOnly{m: m.dirty}
m.read.Store(read)
m.dirty = nil
m.misses = 0
}
m.mu.Unlock()
}
for k, e := range read.m {
v, ok := e.load()
if !ok {
continue
}
if !f(k, v) { //执行迭代回调函数,当返回false时候,停止迭代
break
}
}
}
1.3 为什么不使用sync.Mutex+map实现并发的map呢?
这个问题可以换个问法就是sync.Map相比sync.Mutex+map实现并发map有哪些优势?
sync.Map优势在于当key存在read map时候,如果进行Store操作,可以使用原子性操作更新,而sync.Mutex+map形式每次写操作都要加锁,这个成本更高。
另外并发读写两个不同的key时候,写操作需要加锁,而读操作是不需要加锁的。
1.4 读少写多情况下并发map,应该怎么设计?
这种情况下,可以使用分片锁,跟据key进行hash处理后,找到其对应读写锁,然后进行锁定处理。通过分片锁机制,可以降低锁的粒度来实现读少写多情况下高并发。可以参见orcaman/concurrent-map实现。
1.5 总结
sync.map
是线程安全的,读取,插入,删除也都保持着常数级的时间复杂度。sync.map
的零值是有效的,并且零值是一个空的 map。在第一次使用之后,不允许被拷贝。- 通过读写分离,降低锁时间来提高效率,适用于读多写少的场景
- Range 操作需要提供一个函数,参数是
k,v
,返回值是一个布尔值:f func(key, value interface{}) bool
- 底层是通过空间换时间的方式,使用 read 和 dirty 两个 map 来进行读写分离,降低锁时间来提高效率。
- 用 Load 或 LoadOrStore 函数时,如果在 read 中没有找到 key,则会将 misses 值原子地增加 1,当 misses 增加到和 dirty 的长度相等时,会将 dirty 提升为 read。以期减少“读 miss”。
- 新写入的 key 会保存到 dirty 中,如果这时 dirty 为 nil,就会先新创建一个 dirty,并将 read 中未被删除的元素遍历拷贝到 dirty(这就是为啥写操作比较耗时的原因)
-
当 dirty 为 nil 的时候,read 就代表 map 所有的数据;当 dirty 不为 nil 的时候,dirty 才代表 map 所有的数据
- sync.Map采用空间换时间策略。其底层结构存在两个map,分别是read map和dirty map。当读取操作时候,优先从read map中读取,是不需要加锁的,若key不存在read map中时候,再从dirty map中读取,这个过程是加锁的。当新增key操作时候,只会将新增key添加到dirty map中,此操作是加锁的,但不会影响read map的读操作。当更新key操作时候,如果key已存在read map中时候,只需无锁更新更新read map就行,负责加锁处理在dirty map中情况了。总之sync.Map会优先从read map中读取、更新、删除,因为对read map的读取不需要锁
- 当sync.Map读取key操作时候,若从read map中一直未读到,若dirty map中存在read map中不存在的keys时,则会把dirty map升级为read map,这个过程是加锁的。这样下次读取时候只需要考虑从read map读取,且读取过程是无锁的
- 延迟删除机制,删除一个键值时只是打上删除标记,只有在提升dirty map为read map的时候才清理删除的数据
- sync.Map中的dirty map要么是nil,要么包含read map中所有未删除的key-value。
- sync.Map适用于读多写少场景。根据包官方文档介绍,它特别适合这两个场景:1. 一个key只写入一次但读取多次时,比如在只会增长的缓存中;2. 当多个goroutine读取、写入和更新不相交的键值对时。
2. sync.pool
参考资料
- 深度解密 Go 语言之 sync.map
- 深度解密 Go 语言之 sync.Pool
- 浅谈 Golang sync 包的相关使用方法
- sync - 处理同步需求
- sync.map 源码分析(非常推荐阅读)