Once

Once保证只能执行一个操作,多余的操作都会被忽略调。情景:单例模式的实例创建、初始化配置文件、消息队列nsq删除channel……

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
type Once struct {
	done uint32
	m    Mutex
}
func (o *Once) Do(f func()) {
	if atomic.LoadUint32(&o.done) == 0 {
		o.doSlow(f) //保证操作执行完后才done,保证确定done后其他操作才离开
	}
}
func (o *Once) doSlow(f func()) {
	o.m.Lock()
	defer o.m.Unlock()
	if o.done == 0 {
		defer atomic.StoreUint32(&o.done, 1)
		f()
	}
}

下面的Do,当多个f同时竞争Do时,其他f在未等待 winner f执行结束时离开

1
2
3
4
5
func (o *Once) Do(f func()) {
		if atomic.CompareAndSwapUint32(&o.done, 0, 1) {
			f()
		}
}

Map

结构体

Sync.Map不会直接存储value,而是包裹了一层entry,即<key,*entry>,entry指向value

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
type Map struct {
	mu Mutex
	read atomic.Value // readOnly
  //dirty和read的value都是存指针,指向同一地址时,修改时只需改一处即可
	dirty map[interface{}]*entry 
	misses int
}
// readOnly不是不可以修改,而是不可删除
// 即表示删除value而已
type readOnly struct {
	m       map[interface{}]*entry
	amended bool // 说明admend==false说明,dirty为空
}
// expunged is an arbitrary pointer that marks entries which have been deleted
// from the dirty map.
// 只为了表明read的key已在dirty删除
// 只有在将read复制到dirty过才可能此情况出现
var expunged = unsafe.Pointer(new(interface{}))

// An entry is a slot in the map corresponding to a particular key.
type entry struct {
	p unsafe.Pointer // *interface{},指向Map的value
}

基本操作原理

Sync.Map<key,value>,复制也是浅复制。entry->p->value

  • 初始

    Read nil amended=false,dirty为nil

    dirty nil

  • store k1=1

    Read nil

    dirty. k1=1 amended=true,dirty非nil

  • Store k2=2

    read nil amended=true

    Dirty k1=1 k2=2.

  • store k3=3

    read nil

    dirty. k1=1, k2=2. k3=3.

  • Load k1

    从dirty找到k1,read命中失败,dismiss=1

  • Load k2

    从dirty找到k2,read命中失败,dismiss=2

  • Load k3

    从dirty找到k3,read命中失败,dismiss=3=len(dirty),dirty提升为read,dirty指向nil。浅复制,指针。amended=false,dirty没有对read补充的key

    Read k1=1, k2=2. k3=3.

    dirty. nil

  • store k4

    新插入数据,amended==false,此时dirty一定指向nil,将read已删除或标记为expunged数据据复制到read,并将read的key中entry.p=nil即value为nil的元素标记为expunged,表示该元素在dirty中不存在

    Read k1=1, k2=2. k3=3.

    dirty. k1=1, k2=2. k3=3. k4=4

  • Delete k1

    Read k1=nil, k2=2. k3=3. 注意:read和dirty的k1的entry都指向同一个内存空间

    dirty. k1=nil, k2=2. k3=3. k4=4

  • load k4

    从dirty找到k4,read命中失败,dismiss=1

  • Load k4

    从dirty找到k4,read命中失败,dismiss=2

  • Load k4

    从dirty找到k4,read命中失败,dismiss=3

  • Load k4

    从dirty找到k4,read命中失败,dismiss=4。将dirty提升为read,amended=false

    read. k1=nil, k2=2. k3=3. k4=4

    Dirty nil

  • Store k5

    read. k1=expunged, k2=2. k3=3. k4=4. Amended=true

    dirty k2=2. k3=3. k4=4. k5=5

  • Delete k2

    read. k1=expunged, k2=nil. k3=3. k4=4. Amended=true

    dirty k2=nil. k3=3. k4=4. k5=5

  • Delete k5

    read不存在k5,而在k2

    read. k1=expunged, k2=nil. k3=3. k4=4. Amended=true

    dirty k2=nil. k3=3. k4=4

nil和expunged区别

  1. Entry.p==nil,key都在read和dirty;Entry.p==expunged时,key在read而不在dirty

  2. store时,更新数据,若Entry.p==nil,则 Entry.p = newValue 即可;若Entry.p==expunged,则还需要将key插入到dirty

    nil表示的是删除,expunged只是为了表明read的key在dirty不存在。

Map方法集分析

  • Store(key, value interface{})

     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    
    func (m *Map) Store(key, value interface{}) {
      // read若存在key,且entry没有标志为删除,则更新entry.value即可。
      read, _ := m.read.Load().(readOnly)
      // tryStore,若read的key标志为删除,则更新失败。
      if e, ok := read.m[key]; ok && e.tryStore(&value) {
          return
      }
        
      m.mu.Lock()
      // 第二次检测red是否存在key,防止dirty刚刚提升为read
      read, _ = m.read.Load().(readOnly)
      if e, ok := read.m[key]; ok {
        // 取消key的expunged标志,expunged标志表示该ready的key在dirty中不存在
          if e.unexpungeLocked() {
          // read有,但dirty没有的key,需要添加
          // 添加后dirty和read的key都存在,故需要取消read的key的expunged标志
              m.dirty[key] = e
          }
        // 若e.unexpungeLocked()失败,则表示key在之前的dirty和read都能找到,它们指向的entry相同
          e.storeLocked(&value)
      } else if e, ok := m.dirty[key]; ok {
        //key只存在于dirty,更新dirty的entry
          e.storeLocked(&value)
      } else {
        /* read.amended==false,即Map.dirty==nil时,说明Map.dirty之前提升为read
        而且还没有新数据的插入。所以需要将read数据复制dirty,并将新数据插入到dirty,
        更新read.admended为true,表明dirty非空*/
          if !read.amended {
              m.dirtyLocked()
              m.read.Store(readOnly{m: read.m, amended: true})
          }
          m.dirty[key] = newEntry(value)
      }
      m.mu.Unlock()
    }
      
    

    取消expunged,只有entry.p==expunged才成功

    1
    2
    3
    
    func (e *entry) unexpungeLocked() (wasExpunged bool) {
      return atomic.CompareAndSwapPointer(&e.p, expunged, nil)
    }
    

    将read复制到dirty

     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    11
    12
    13
    
    func (m *Map) dirtyLocked() {
      if m.dirty != nil {
          return
      }
      read, _ := m.read.Load().(readOnly)
      m.dirty = make(map[interface{}]*entry, len(read.m))
      for k, e := range read.m { 
        // 不复制已被删除的value,并给已删除value的添加删除标志
          if !e.tryExpungeLocked() {
              m.dirty[k] = e // 指针复制
          }
      }
    }
    

    尝试给entry添加删除标志,这里try表示尝试直到成功。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
  // 只有将read复制到dirty调用
  func (e *entry) tryExpungeLocked() (isExpunged bool) {
  	p := atomic.LoadPointer(&e.p)
  	for p == nil { 
      // 若entry.p==nil,表示value已删除
      // 可上expunged,表示read有的key,dirty没有
  		if atomic.CompareAndSwapPointer(&e.p, nil, expunged) {
  			return true
  		}
  		p = atomic.LoadPointer(&e.p)
  	}
  	return p == expunged
  }

entry更新value

1
2
3
  func (e *entry) storeLocked(i *interface{}) {
  	atomic.StorePointer(&e.p, unsafe.Pointer(i))
  }
  • Load(key interface{}) (value interface{}, ok bool)

    read命中失败 指的是 key在read中查找失败,而且在dirty中查找成功

     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    
    func (m *Map) Load(key interface{}) (value interface{}, ok bool) {
      read, _ := m.read.Load().(readOnly)
      e, ok := read.m[key] // 先查询read是否存在key
      if !ok && read.amended {
          m.mu.Lock()
        // 二次检测,防止dirty刚刚提升为read
          read, _ = m.read.Load().(readOnly)
          e, ok = read.m[key]
          if !ok && read.amended {
          // 若read不存在key,而且dirty有补充,
              e, ok = m.dirty[key]
              m.missLocked() // 统计read命中失败次数
          }
          m.mu.Unlock()
      }
      if !ok {
          return nil, false
      }
      return e.load()
    }
    

    统计read命中失败

     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    11
    
    func (m *Map) missLocked() {
      m.misses++
      if m.misses < len(m.dirty) {
          return
      }
      // read命中失败次数大于等于dirty元素个数,则将dirty提升为read
      // 并将dirty指向nil,重置read的命中失败次数为0
      m.read.Store(readOnly{m: m.dirty})
      m.dirty = nil
      m.misses = 0
    }
    

    entry load sync.Map real value

    1
    2
    3
    4
    5
    6
    7
    
    func (e *entry) load() (value interface{}, ok bool) {
      p := atomic.LoadPointer(&e.p)
      if p == nil || p == expunged {
          return nil, false
      }
      return *(*interface{})(p), true
    }
    
  • LoadOrStore(key, value interface{}) (actual interface{}, loaded bool)

    若果key的value存在,则返回,若不存在则存储value

     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    
    func (m *Map) LoadOrStore(key, value interface{}) (actual interface{}, loaded bool) {
      // 若read有key,则直接拿
      read, _ := m.read.Load().(readOnly)
      if e, ok := read.m[key]; ok {
          actual, loaded, ok := e.tryLoadOrStore(value)
          if ok {
              return actual, loaded
          }
      }
      
      m.mu.Lock()
      // 二次检测,防止dirty刚刚提升为read
      read, _ = m.read.Load().(readOnly)
      if e, ok := read.m[key]; ok {
          if e.unexpungeLocked() {
              m.dirty[key] = e
          }
          actual, loaded, _ = e.tryLoadOrStore(value)
      } else if e, ok := m.dirty[key]; ok {
        // dirty有数据,新插入的key还没有提升为read元素
          actual, loaded, _ = e.tryLoadOrStore(value)
          m.missLocked()
      } else {
         /* read.amended==false,即Map.dirty==nil时,说明Map.dirty刚刚提升为read
        还没有新数据的插入。所以需要将read数据复制dirty,并将新数据插入到dirty,
        更新read.admended为true,表明read数据时不完整的,需要dirty补充。*/
          if !read.amended {
              m.dirtyLocked()
              m.read.Store(readOnly{m: read.m, amended: true})
          }
          m.dirty[key] = newEntry(value)
          actual, loaded = value, false
      }
      m.mu.Unlock()
      
      return actual, loaded
    }
    

    entry加载或存储元素

     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    
    func (e *entry) tryLoadOrStore(i interface{}) (actual interface{}, loaded, ok bool) {
      p := atomic.LoadPointer(&e.p)
      if p == expunged {
        // key在dirty已删除,key不再dirty中
          return nil, false, false
      }
      if p != nil {
          return *(*interface{})(p), true, true
      }
      ic := i
      for {
          if atomic.CompareAndSwapPointer(&e.p, nil, unsafe.Pointer(&ic)) {
          //value已删除,直接覆写
              return i, false, true
          }
        // 循环尝试,直到成功
          p = atomic.LoadPointer(&e.p)
          if p == expunged {
              return nil, false, false
          }
          if p != nil {
              return *(*interface{})(p), true, true
          }
      }
    }
    
  • Delete(key interface{})

    删除key,value一定会被删除;可以删除value,保留key,即entry.p=nill。

    若删除前read存在key,则删除后dirty和read的key均保留;若read不存在key

     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    
    func (m *Map) Delete(key interface{}) {
      read, _ := m.read.Load().(readOnly)
      e, ok := read.m[key]
      if !ok && read.amended {
          m.mu.Lock()
        // 二次检测,防止dirty刚刚提升为read
          read, _ = m.read.Load().(readOnly)
          e, ok = read.m[key]
          if !ok && read.amended {
          // read不存在key,且dirty不为空,key若在dirty则删除
              delete(m.dirty, key)
          }
          m.mu.Unlock()
      }
      if ok {
        //read只删除value,保留key
          e.delete()
      }
    }
    

    entry删除value,即entry.p=nil

     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    11
    
    func (e *entry) delete() (hadValue bool) {
      for {
          p := atomic.LoadPointer(&e.p)
          if p == nil || p == expunged { //entry.p已被删除
              return false
          }
          if atomic.CompareAndSwapPointer(&e.p, p, nil) {
              return true
          }
      }
    }
    
  • Range(f func(key, value interface{}) bool)

    遍历获取sync.Map的所有元素,使用的是快照数据,在多 goroutines 情况写入情况下未必是最新数据

     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    
    func (m *Map) Range(f func(key, value interface{}) bool) {
      read, _ := m.read.Load().(readOnly)
      if read.amended {
          m.mu.Lock()
          read, _ = m.read.Load().(readOnly)
          if read.amended {
              read = readOnly{m: m.dirty}
              m.read.Store(read)
              m.dirty = nil
              m.misses = 0
          }
          m.mu.Unlock()
      }
      
      for k, e := range read.m {
          v, ok := e.load()
          if !ok {
              continue
          }
          if !f(k, v) {
              break
          }
      }
    }
    

总结

sync.Map适合多goroutines多读少写。此情况命中read情况高,无需加锁;只有增删数据或读取新添加数据需要加锁。(删除的key若在read中,也无需加锁)

Cond

关于sync.Cond官网介绍

最近在看Goroutine Pool(以下简称Pool) ants 项目的时候,发现使用了变量类型 sync.Cond。起初不知道什么意思,先是思考了 Pool 的实现过程,想到,如果 Task(任务) 多过 Worker(工人) 的时候,Pool 应该要等待有空闲的 Worke 才给其分配 Task。而 Pool 如何知道有空闲的 Worker 呢? 有以下两种方法可以实现

  • Pool 忙等,不停地循环检测 Woker 队列,若Length 大于0 则给 Worker 分配 Task
  • Worker被回收到队列后,给 Pool 发送一个信号,通知有我已回归队列

很显然方法而更感觉好些,ants 就是使用了方法二,以下是它 worker回归的方法

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
// Worker 处理完 Task 后自己回到 Pool
func (p *PoolWithFunc) revertWorker(worker *goWorkerWithFunc) bool {
  if atomic.LoadInt32(&p.state) == CLOSED || p.Running() > p.Cap() {
    return false
  }
  worker.recycleTime = time.Now()
  p.lock.Lock()
  p.workers = append(p.workers, worker)

  p.cond.Signal() //Worker 回归队列后发送信号
  p.lock.Unlock()
  return true
}

下面是 Pool 给 Worker 分配 Task

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
// 取有空闲的 Worker
func (p *PoolWithFunc) retrieveWorker() *goWorkerWithFunc {
  var w *goWorkerWithFunc
  spawnWorker := func() {
    w = p.workerCache.Get().(*goWorkerWithFunc)
    w.run()
  }

  p.lock.Lock()
  idleWorkers := p.workers
  n := len(idleWorkers) - 1
  if n >= 0 { // 有空闲的 Worker,
    w = idleWorkers[n]
    idleWorkers[n] = nil
    p.workers = idleWorkers[:n]
    p.lock.Unlock()
  } else if p.Running() < p.Cap() {
    p.lock.Unlock() 
    // 无空闲Worker,但 Pool 还没到达上限,则创建新Worker
    spawnWorker()
  } else {
    if p.options.Nonblocking {
      p.lock.Unlock()
      return nil
    }
    // Pool 的 Worker 数量已达上限,则循环等待、尝试获取空闲 Worker
    Reentry:
    if p.options.MaxBlockingTasks != 0 && 
    		p.blockingNum >= p.options.MaxBlockingTasks {
      p.lock.Unlock()
      return nil
    }
    p.blockingNum++
    // 等待直到有空闲 Worker
    p.cond.Wait() 
    ...
  }
  return w
}

了解了sync.Cond 的用途后,我们来看看它的真面目:

​ sync.Cond 实现了一个变量,用于让多个 goroutines 等待某个事件信号。sync.Cond需要绑定一个Locker (如:互斥锁Mutex 、读写锁RWMutex 、自旋锁SpinLock)。三个 sync.Cond 的常用函数如下

  • Wait

    等待事件信号,需要先获取sync.Cond绑定的 Locker,原因看Wait函数的源代码

    1
    2
    3
    4
    5
    6
    7
    
    func (c *Cond) Wait() {
      c.checker.check()
      t := runtime_notifyListAdd(&c.notify)
      c.L.Unlock()
      runtime_notifyListWait(&c.notify, t)
      c.L.Lock()
    }
    
  • Signal

    事件信号,通知一个 Gorutine 继续前进

  • Broadcast

    广播信号,通知所有 Gorutine 可以继续前进

自旋锁 Spin Lock

wiki

所谓自旋,即无限循环抢占,亦可说忙等。忙等很浪费计算机资源,不适用于长时间持有锁的情况,但因为不会像互斥锁那样引起调用者睡眠,所以效率比较高。Golang需要自己实现自旋锁,代码是一种实现方式

代码来源

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
type spinLock uint32

func (sl *spinLock) Lock() {
	for !atomic.CompareAndSwapUint32((*uint32)(sl), 0, 1) {
		runtime.Gosched()
	}
}

func (sl *spinLock) Unlock() {
	atomic.StoreUint32((*uint32)(sl), 0)
}

// NewSpinLock instantiates a spin-lock.
func NewSpinLock() sync.Locker {
	return new(spinLock)
}

sync.Pool 对象池

顾名思义,存放对象的池,是为了对象重用,防止gc。操作 goroutines 安全。

常用的操作有以下两个

  • Get

    从 Pool 中取一个对象。若对象池没有对象,则有以下以下两种情况

    • 指定了New函数, New 一个对象返回;
    • 返回 nil
1
2
3
4
5
6
7
func (p *Pool) Get() interface{} {
  ...
  if x == nil && p.New != nil {
		x = p.New()
	}
	return x
}

指定 New 函数

1
2
3
4
5
peopleCache.New = func() interface{} {
		return &People{
			Name: "我是阿晶",
		}
}
  • Put

    将对象回收到对象池

测试

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
type People struct {
	Name string
}

func main() {
	var peopleCache sync.Pool
	peopleCache.New = func() interface{} {
		return &People{
			Name: "我是阿晶",
		}
	}
	p := peopleCache.Get().(*People)
	fmt.Printf("%p\n", p)
	peopleCache.Put(p)

	p = peopleCache.Get().(*People)
	fmt.Printf("%p\n", p)
}

参考

官方Package sync

不得不知道的Golang之sync.Map源码分析