Once
Once保证只能执行一个操作,多余的操作都会被忽略调。情景:单例模式的实例创建、初始化配置文件、消息队列nsq删除channel……
|
|
下面的Do,当多个f同时竞争Do时,其他f在未等待 winner f执行结束时离开
|
|
Map
结构体
Sync.Map不会直接存储value,而是包裹了一层entry,即<key,*entry>,entry指向value
|
|
基本操作原理
Sync.Map<key,value>,复制也是浅复制。entry->p->value
-
初始
Read nil amended=false,dirty为nil
dirty nil
-
store k1=1
Read nil
dirty. k1=1 amended=true,dirty非nil
-
Store k2=2
read nil amended=true
Dirty k1=1 k2=2.
-
store k3=3
read nil
dirty. k1=1, k2=2. k3=3.
-
Load k1
从dirty找到k1,read命中失败,dismiss=1
-
Load k2
从dirty找到k2,read命中失败,dismiss=2
-
Load k3
从dirty找到k3,read命中失败,dismiss=3=len(dirty),dirty提升为read,dirty指向nil。浅复制,指针。amended=false,dirty没有对read补充的key
Read k1=1, k2=2. k3=3.
dirty. nil
-
store k4
新插入数据,amended==false,此时dirty一定指向nil,将read已删除或标记为expunged数据据复制到read,并将read的key中entry.p=nil即value为nil的元素标记为expunged,表示该元素在dirty中不存在
Read k1=1, k2=2. k3=3.
dirty. k1=1, k2=2. k3=3. k4=4
-
Delete k1
Read k1=nil, k2=2. k3=3. 注意:read和dirty的k1的entry都指向同一个内存空间
dirty. k1=nil, k2=2. k3=3. k4=4
-
load k4
从dirty找到k4,read命中失败,dismiss=1
-
Load k4
从dirty找到k4,read命中失败,dismiss=2
-
Load k4
从dirty找到k4,read命中失败,dismiss=3
-
Load k4
从dirty找到k4,read命中失败,dismiss=4。将dirty提升为read,amended=false
read. k1=nil, k2=2. k3=3. k4=4
Dirty nil
-
Store k5
read. k1=expunged, k2=2. k3=3. k4=4. Amended=true
dirty k2=2. k3=3. k4=4. k5=5
-
Delete k2
read. k1=expunged, k2=nil. k3=3. k4=4. Amended=true
dirty k2=nil. k3=3. k4=4. k5=5
-
Delete k5
read不存在k5,而在k2
read. k1=expunged, k2=nil. k3=3. k4=4. Amended=true
dirty k2=nil. k3=3. k4=4
nil和expunged区别
-
Entry.p==nil,key都在read和dirty;Entry.p==expunged时,key在read而不在dirty
-
store时,更新数据,若Entry.p==nil,则 Entry.p = newValue 即可;若Entry.p==expunged,则还需要将key插入到dirty
nil表示的是删除,expunged只是为了表明read的key在dirty不存在。
Map方法集分析
-
Store(key, value interface{})
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36
func (m *Map) Store(key, value interface{}) { // read若存在key,且entry没有标志为删除,则更新entry.value即可。 read, _ := m.read.Load().(readOnly) // tryStore,若read的key标志为删除,则更新失败。 if e, ok := read.m[key]; ok && e.tryStore(&value) { return } m.mu.Lock() // 第二次检测red是否存在key,防止dirty刚刚提升为read read, _ = m.read.Load().(readOnly) if e, ok := read.m[key]; ok { // 取消key的expunged标志,expunged标志表示该ready的key在dirty中不存在 if e.unexpungeLocked() { // read有,但dirty没有的key,需要添加 // 添加后dirty和read的key都存在,故需要取消read的key的expunged标志 m.dirty[key] = e } // 若e.unexpungeLocked()失败,则表示key在之前的dirty和read都能找到,它们指向的entry相同 e.storeLocked(&value) } else if e, ok := m.dirty[key]; ok { //key只存在于dirty,更新dirty的entry e.storeLocked(&value) } else { /* read.amended==false,即Map.dirty==nil时,说明Map.dirty之前提升为read 而且还没有新数据的插入。所以需要将read数据复制dirty,并将新数据插入到dirty, 更新read.admended为true,表明dirty非空*/ if !read.amended { m.dirtyLocked() m.read.Store(readOnly{m: read.m, amended: true}) } m.dirty[key] = newEntry(value) } m.mu.Unlock() }
取消expunged,只有entry.p==expunged才成功
1 2 3
func (e *entry) unexpungeLocked() (wasExpunged bool) { return atomic.CompareAndSwapPointer(&e.p, expunged, nil) }
将read复制到dirty
1 2 3 4 5 6 7 8 9 10 11 12 13
func (m *Map) dirtyLocked() { if m.dirty != nil { return } read, _ := m.read.Load().(readOnly) m.dirty = make(map[interface{}]*entry, len(read.m)) for k, e := range read.m { // 不复制已被删除的value,并给已删除value的添加删除标志 if !e.tryExpungeLocked() { m.dirty[k] = e // 指针复制 } } }
尝试给entry添加删除标志,这里try表示尝试直到成功。
|
|
entry更新value
|
|
-
Load(key interface{}) (value interface{}, ok bool)
read命中失败 指的是 key在read中查找失败,而且在dirty中查找成功
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20
func (m *Map) Load(key interface{}) (value interface{}, ok bool) { read, _ := m.read.Load().(readOnly) e, ok := read.m[key] // 先查询read是否存在key if !ok && read.amended { m.mu.Lock() // 二次检测,防止dirty刚刚提升为read read, _ = m.read.Load().(readOnly) e, ok = read.m[key] if !ok && read.amended { // 若read不存在key,而且dirty有补充, e, ok = m.dirty[key] m.missLocked() // 统计read命中失败次数 } m.mu.Unlock() } if !ok { return nil, false } return e.load() }
统计read命中失败
1 2 3 4 5 6 7 8 9 10 11
func (m *Map) missLocked() { m.misses++ if m.misses < len(m.dirty) { return } // read命中失败次数大于等于dirty元素个数,则将dirty提升为read // 并将dirty指向nil,重置read的命中失败次数为0 m.read.Store(readOnly{m: m.dirty}) m.dirty = nil m.misses = 0 }
entry load sync.Map real value
1 2 3 4 5 6 7
func (e *entry) load() (value interface{}, ok bool) { p := atomic.LoadPointer(&e.p) if p == nil || p == expunged { return nil, false } return *(*interface{})(p), true }
-
LoadOrStore(key, value interface{}) (actual interface{}, loaded bool)
若果key的value存在,则返回,若不存在则存储value
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37
func (m *Map) LoadOrStore(key, value interface{}) (actual interface{}, loaded bool) { // 若read有key,则直接拿 read, _ := m.read.Load().(readOnly) if e, ok := read.m[key]; ok { actual, loaded, ok := e.tryLoadOrStore(value) if ok { return actual, loaded } } m.mu.Lock() // 二次检测,防止dirty刚刚提升为read read, _ = m.read.Load().(readOnly) if e, ok := read.m[key]; ok { if e.unexpungeLocked() { m.dirty[key] = e } actual, loaded, _ = e.tryLoadOrStore(value) } else if e, ok := m.dirty[key]; ok { // dirty有数据,新插入的key还没有提升为read元素 actual, loaded, _ = e.tryLoadOrStore(value) m.missLocked() } else { /* read.amended==false,即Map.dirty==nil时,说明Map.dirty刚刚提升为read 还没有新数据的插入。所以需要将read数据复制dirty,并将新数据插入到dirty, 更新read.admended为true,表明read数据时不完整的,需要dirty补充。*/ if !read.amended { m.dirtyLocked() m.read.Store(readOnly{m: read.m, amended: true}) } m.dirty[key] = newEntry(value) actual, loaded = value, false } m.mu.Unlock() return actual, loaded }
entry加载或存储元素
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25
func (e *entry) tryLoadOrStore(i interface{}) (actual interface{}, loaded, ok bool) { p := atomic.LoadPointer(&e.p) if p == expunged { // key在dirty已删除,key不再dirty中 return nil, false, false } if p != nil { return *(*interface{})(p), true, true } ic := i for { if atomic.CompareAndSwapPointer(&e.p, nil, unsafe.Pointer(&ic)) { //value已删除,直接覆写 return i, false, true } // 循环尝试,直到成功 p = atomic.LoadPointer(&e.p) if p == expunged { return nil, false, false } if p != nil { return *(*interface{})(p), true, true } } }
-
Delete(key interface{})
删除key,value一定会被删除;可以删除value,保留key,即entry.p=nill。
若删除前read存在key,则删除后dirty和read的key均保留;若read不存在key
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
func (m *Map) Delete(key interface{}) { read, _ := m.read.Load().(readOnly) e, ok := read.m[key] if !ok && read.amended { m.mu.Lock() // 二次检测,防止dirty刚刚提升为read read, _ = m.read.Load().(readOnly) e, ok = read.m[key] if !ok && read.amended { // read不存在key,且dirty不为空,key若在dirty则删除 delete(m.dirty, key) } m.mu.Unlock() } if ok { //read只删除value,保留key e.delete() } }
entry删除value,即entry.p=nil
1 2 3 4 5 6 7 8 9 10 11
func (e *entry) delete() (hadValue bool) { for { p := atomic.LoadPointer(&e.p) if p == nil || p == expunged { //entry.p已被删除 return false } if atomic.CompareAndSwapPointer(&e.p, p, nil) { return true } } }
-
Range(f func(key, value interface{}) bool)
遍历获取sync.Map的所有元素,使用的是快照数据,在多 goroutines 情况写入情况下未必是最新数据
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24
func (m *Map) Range(f func(key, value interface{}) bool) { read, _ := m.read.Load().(readOnly) if read.amended { m.mu.Lock() read, _ = m.read.Load().(readOnly) if read.amended { read = readOnly{m: m.dirty} m.read.Store(read) m.dirty = nil m.misses = 0 } m.mu.Unlock() } for k, e := range read.m { v, ok := e.load() if !ok { continue } if !f(k, v) { break } } }
总结
sync.Map适合多goroutines多读少写。此情况命中read情况高,无需加锁;只有增删数据或读取新添加数据需要加锁。(删除的key若在read中,也无需加锁)
Cond
最近在看Goroutine Pool(以下简称Pool) ants 项目的时候,发现使用了变量类型 sync.Cond。起初不知道什么意思,先是思考了 Pool 的实现过程,想到,如果 Task(任务) 多过 Worker(工人) 的时候,Pool 应该要等待有空闲的 Worke 才给其分配 Task。而 Pool 如何知道有空闲的 Worker 呢? 有以下两种方法可以实现
- Pool 忙等,不停地循环检测 Woker 队列,若Length 大于0 则给 Worker 分配 Task
- Worker被回收到队列后,给 Pool 发送一个信号,通知有我已回归队列
很显然方法而更感觉好些,ants 就是使用了方法二,以下是它 worker回归的方法
|
|
下面是 Pool 给 Worker 分配 Task
|
|
了解了sync.Cond 的用途后,我们来看看它的真面目:
sync.Cond 实现了一个变量,用于让多个 goroutines 等待某个事件信号。sync.Cond需要绑定一个Locker (如:互斥锁Mutex 、读写锁RWMutex 、自旋锁SpinLock)。三个 sync.Cond 的常用函数如下
-
Wait
等待事件信号,需要先获取sync.Cond绑定的 Locker,原因看Wait函数的源代码
1 2 3 4 5 6 7
func (c *Cond) Wait() { c.checker.check() t := runtime_notifyListAdd(&c.notify) c.L.Unlock() runtime_notifyListWait(&c.notify, t) c.L.Lock() }
-
Signal
事件信号,通知一个 Gorutine 继续前进
-
Broadcast
广播信号,通知所有 Gorutine 可以继续前进
自旋锁 Spin Lock
所谓自旋,即无限循环抢占,亦可说忙等。忙等很浪费计算机资源,不适用于长时间持有锁的情况,但因为不会像互斥锁那样引起调用者睡眠,所以效率比较高。Golang需要自己实现自旋锁,代码是一种实现方式
|
|
sync.Pool 对象池
顾名思义,存放对象的池,是为了对象重用,防止gc。操作 goroutines 安全。
常用的操作有以下两个
-
Get
从 Pool 中取一个对象。若对象池没有对象,则有以下以下两种情况
- 指定了New函数, New 一个对象返回;
- 返回 nil
|
|
指定 New 函数
|
|
-
Put
将对象回收到对象池
测试
|
|