分析工具

  • go build -gcflags 给编译器传参数
  • gcflag 参数
    • -N 禁用优化
    • -l 禁止内联
  • go tool objdump -s “main” 反汇编,并输出匹配 -s 参数

重要数据结构

Channel 结构

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
type hchan struct {
	qcount   uint           // total data in the queue
	dataqsiz uint           // size of the circular queue
	buf      unsafe.Pointer // points to an array of dataqsiz elements
	elemsize uint16 // 单个元素的大小,就是 _type 的 size
	closed   uint32  // 0 表示channel未关闭
	elemtype *_type // element type
	sendx    uint   // send index
	recvx    uint   // receive index
	recvq    waitq  // list of recv waiters
	sendq    waitq  // list of send waiters

	// Do not change another G's status while holding this lock
	// (in particular, do not ready a G), as this can deadlock
	// with stack shrinking.
  // 这个解释不是很明白,暂时简单把它当作互斥锁好了
	lock mutex
}

Gorutines 队列,因为一个 Channel 的 receiver 和 sender 是多对多关系

1
2
3
4
type waitq struct {
	first *sudog
	last  *sudog
}

sudog 代理 Goroutine 等待,我个人认为 sudog = sudo+ Goroutine,sudo 表示 substitute user and do

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
type sudog struct {
	g *g
  
 	//G 是否因为 selectgo 放入等待队列
	isSelect bool
	next     *sudog
	prev     *sudog
	elem     unsafe.Pointer // data element (may point to stack)

	// 以下参数在不会并发访问
	acquiretime int64
	releasetime int64
	ticket      uint32
	parent      *sudog // semaRoot binary tree
	waitlink    *sudog // g.waiting list or semaRoot
	waittail    *sudog // semaRoot
	c           *hchan // channel
}

特点和解释

channel 被 closed 后,读不会阻塞,但不可以写

channel被 closed 后,若有元素,读正常;若队列为空,不会阻塞,总会返回一个空值,但读标记为false。此特点可以作为退出信号。如 NSQ 使用 exitChan 作为退出信号

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
  ...
  if c.closed != 0 && c.qcount == 0 {
		if raceenabled {
			raceacquire(c.raceaddr())
		}
		unlock(&c.lock)
		if ep != nil {
			typedmemclr(c.elemtype, ep) //返回一个空值
		}
		return true, false
	}
  ...
}

若Channel 被close 处理过程如下

1
2
3
4
5
6
7
8
func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
  ...
  if c.closed != 0 {
		unlock(&c.lock)
		panic(plainError("send on closed channel"))
	}
	...
}

无缓冲的chan和只有一个元素的chan区别

内存分配内

无缓冲chan的 buf 不会分配空间,自己指向自己;有一个元素的chan的buf会指向分配的内存空间

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
func makechan(t *chantype, size int) *hchan {
  ...
  mem, overflow := math.MulUintptr(elem.size, uintptr(size))
	if overflow || mem > maxAlloc-hchanSize || size < 0 {
		panic(plainError("makechan: size out of range"))
	}
  var c *hchan
	switch {
    case mem == 0: //无缓冲 如:make(chan int)或make(chan int,0)
		c = (*hchan)(mallocgc(hchanSize, nil, true))
    // 等价 c.buf = unsafe.Pointer(&c.buf),不太明白为什么将自己的指针值的地址赋给自己
		c.buf = c.raceaddr() 
	case elem.ptrdata == 0:
    // 有缓冲,但元素是非指针类型,如make(chan int,1)
    // 将 buf 和 channel 内存分在一起
		c = (*hchan)(mallocgc(hchanSize+mem, nil, true))
		c.buf = add(unsafe.Pointer(c), hchanSize)
	default:
		// 元素为指针类型,buf 和 channel 内存分开
		c = new(hchan)
		c.buf = mallocgc(mem, elem, true)
	}
  
  c.elemsize = uint16(elem.size)
	c.elemtype = elem
	c.dataqsiz = uint(size)
  ...
  return c
}
发送消息

发送消息,若无 receiver 等待,无缓冲 Channel 会立即阻塞

无缓冲Channel,情况一般有两种:

  • 有 receiver
  • 当前 Gorotuine 进入 sendq 等待发送队列, 阻塞当前 Goroutine。

缓冲长度为1的Channel,情况一般有三种:

  • 有 receiver 等待,也是直接将消息传给 receiver;
  • 若缓冲未满,则将消息放入缓冲;
  • 若缓冲已满,也是Gorotuine进入 sendq 等待发送队列,阻塞当前 Goroutine。
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
  
  ...
  // 刚好有等待接受者,说明缓冲为空,直接将消息发送给接受者而不入缓冲
  if sg := c.recvq.dequeue(); sg != nil {
		send(c, sg, ep, func() { unlock(&c.lock) }, 3)
		return true
	}
  // 缓冲未满,将消息放入缓冲
  if c.qcount < c.dataqsiz {
		...
		return true
	}
  ...
	gp := getg() //获取当前的Goroutine
	...
  //  将当前 Goroutine 的sudog入等待发送队列
	c.sendq.enqueue(mysg)
  ...
}

区别例子

1
2
3
4
5
6
func main() {
	ch := make(chan int)
	ch <- 1 //无缓冲,Main Goroutine 立即阻塞,导致死锁
}
output:
	fatal error: all goroutines are asleep - deadlock!
1
2
3
4
func main() {
	ch := make(chan int, 1)
	ch <- 1 //有一个缓冲,Main Goroutine 不会阻塞,程序正常结束
}

为什么 Select Channel 是随机的?

首先,select目前分三种情况,不同情况会编译成不同代码,第三种情况

  • 只有一个case channel
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
func main() {
	c1 := make(chan int)
	go func() {
		time.Sleep(time.Second)
		c1 <- 1
	}()
	select {
	case v := <-c1:
		fmt.Printf("%d <- c1\n", v)
	}
}

上面的代码,被化简后其实就是

1
2
3
4
5
6
7
8
9
func main() {
	c1 := make(chan int)
	go func() {
		time.Sleep(time.Second)
		c1 <- 1
	}()
	v := <-c1:
	fmt.Printf("%d <- c1\n", v)	
}

可以将目标代码转换成汇编代码,发现并查看所 CALL 的函数,发现一摸一样

  • 只有一个case channel和default。case channel不会导致 G 阻塞, block标志为false。若执行 chanrecv时若无数据则直接返回false, false,不会阻塞G

     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    11
    12
    13
    
    func main() {
      c1 := make(chan int)
      go func() {
          time.Sleep(time.Second)
          c1 <- 1
      }()
      select {
      case v := <-c1:
          fmt.Printf("%d <- c1\n", v)
      default:
          fmt.Println("default")
      }
    }
    

    在编译后,以上代码转换成

     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    
    func main() {
      c1 := make(chan int)
      go func() {
          time.Sleep(time.Second)
          c1 <- 1
      }()
      var v int
      if selectnbrecv(&v, c1) {
          fmt.Printf("%d <- c1\n", v)
      } else {
          fmt.Println("default")
      }
    }
      
    func selectnbrecv(elem unsafe.Pointer, c *hchan) (selected bool) {
      selected, _ = chanrecv(c, elem, false) // 非阻塞receiver
      return
    }
    
  • Case 多个 Channel,select底层实现函数为 selectgo。多个channel由多个 G 同步操作,公平竞争修改同一个G的selectdone,哪个channel先修改selectdone,哪个channel就被选中,否则被丢弃

     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    
    func main() {
      c1 := make(chan int)
      c2 := make(chan int)
      go func() {
          time.Sleep(time.Second)
          c1 <- 1
      }()
      go func() {
          time.Sleep(time.Second)
          c2 <- 1
      }()
      select {
      case v := <-c1: // Main G 入c1等待队列,sudog isSelect置为true
          fmt.Printf("%d <- c1\n", v)
      case v := <-c2: // Main G 入c2等待队列, sudog isSelect置为true
          fmt.Printf("%d <- c2\n", v)
      }
      // select 选中c2,Main G 的 selectDone置为1
      // c1 等待队列中的Main G还在等待队列中,应该被忽略
    }
    
      1
      2
      3
      4
      5
      6
      7
      8
      9
     10
     11
     12
     13
     14
     15
     16
     17
     18
     19
     20
     21
     22
     23
     24
     25
     26
     27
     28
     29
     30
     31
     32
     33
     34
     35
     36
     37
     38
     39
     40
     41
     42
     43
     44
     45
     46
     47
     48
     49
     50
     51
     52
     53
     54
     55
     56
     57
     58
     59
     60
     61
     62
     63
     64
     65
     66
     67
     68
     69
     70
     71
     72
     73
     74
     75
     76
     77
     78
     79
     80
     81
     82
     83
     84
     85
     86
     87
     88
     89
     90
     91
     92
     93
     94
     95
     96
     97
     98
     99
    100
    101
    102
    103
    104
    105
    106
    107
    108
    109
    110
    111
    112
    113
    114
    115
    116
    117
    118
    119
    120
    121
    122
    123
    124
    125
    126
    127
    128
    129
    130
    131
    132
    133
    134
    135
    136
    137
    138
    139
    140
    141
    142
    143
    144
    145
    146
    147
    148
    149
    150
    151
    152
    153
    154
    155
    156
    157
    158
    159
    160
    161
    162
    163
    164
    165
    166
    167
    168
    169
    170
    171
    172
    173
    174
    175
    176
    177
    178
    179
    180
    181
    182
    183
    184
    185
    
    // runtime/chan.go
    func (q *waitq) dequeue() *sudog {
      for {
          ...
        // 若已有channel被选中,此channel竞争失败,但来不及丢弃的 G 应该忽略
          if sgp.isSelect && !atomic.Cas(&sgp.g.selectDone, 0, 1) {
              continue
          }
      
          return sgp
      }
    }
      
    // runtime/select.go
    func selectgo(cas0 *scase, order0 *uint16, ncases int) (int, bool) {
    ...
    loop:
      // pass 1 - look for something already waiting
      var dfli int
      var dfl *scase
      var casi int
      var cas *scase
      var recvOK bool
      for i := 0; i < ncases; i++ {
          casi = int(pollorder[i])
          cas = &scases[casi]
          c = cas.c
      
          switch cas.kind {
          case caseNil:
              continue
      
          case caseRecv:
              sg = c.sendq.dequeue()
              if sg != nil {
                  goto recv
              }
              if c.qcount > 0 {
                  goto bufrecv
              }
              if c.closed != 0 {
                  goto rclose
              }
      
          case caseSend:
              if raceenabled {
                  racereadpc(c.raceaddr(), cas.pc, chansendpc)
              }
              if c.closed != 0 {
                  goto sclose
              }
              sg = c.recvq.dequeue()
              if sg != nil {
                  goto send
              }
              if c.qcount < c.dataqsiz {
                  goto bufsend
              }
      
          case caseDefault:
              dfli = casi
              dfl = cas
          }
      }
      
      if dfl != nil {
          selunlock(scases, lockorder)
          casi = dfli
          cas = dfl
          goto retc
      }
      
      // pass 2 - enqueue on all chans
      gp = getg()
      if gp.waiting != nil {
          throw("gp.waiting != nil")
      }
      nextp = &gp.waiting
      for _, casei := range lockorder {
          casi = int(casei)
          cas = &scases[casi]
          if cas.kind == caseNil {
              continue
          }
          c = cas.c
          sg := acquireSudog()
          sg.g = gp
          sg.isSelect = true // G 因为selectgo进入channel等待队列
          // No stack splits between assigning elem and enqueuing
          // sg on gp.waiting where copystack can find it.
          sg.elem = cas.elem
          sg.releasetime = 0
          if t0 != 0 {
              sg.releasetime = -1
          }
          sg.c = c
          // Construct waiting list in lock order.
          *nextp = sg
          nextp = &sg.waitlink
      
          switch cas.kind {
          case caseRecv:
              c.recvq.enqueue(sg)
      
          case caseSend:
              c.sendq.enqueue(sg)
          }
      }
      
      // wait for someone to wake us up
      gp.param = nil
      gopark(selparkcommit, nil, waitReasonSelect, traceEvGoBlockSelect, 1)
      gp.activeStackChans = false
      
      sellock(scases, lockorder)
      
      gp.selectDone = 0
      // 被唤醒的sudog, sudog属于某个channel,即此channel导致G被唤醒
      sg = (*sudog)(gp.param) 
      gp.param = nil
      
      // pass 3 - dequeue from unsuccessful chans
      // otherwise they stack up on quiet channels
      // record the successful case, if any.
      // We singly-linked up the SudoGs in lock order.
      casi = -1
      cas = nil
      sglist = gp.waiting
      // Clear all elem before unlinking from gp.waiting.
      for sg1 := gp.waiting; sg1 != nil; sg1 = sg1.waitlink {
          sg1.isSelect = false
          sg1.elem = nil
          sg1.c = nil
      }
      gp.waiting = nil
      
      for _, casei := range lockorder {
          k = &scases[casei]
          if k.kind == caseNil {
              continue
          }
          if sglist.releasetime > 0 {
              k.releasetime = sglist.releasetime
          }
          if sg == sglist {
              // sg has already been dequeued by the G that woke us up.
          // 这个 sudog 导致 G 被唤醒,即这个channel导致G被唤醒
              casi = int(casei)
              cas = k
          } else {
          // 还在其他channel等待的sudog应该丢弃,因为其他channel在此selector中竞争失败
          // example:
          // select {
          // case x := <-ch1: //ch1的sudog应该要移除
          // 	fmt.Println(x)
          // case x := <-ch2: //假如ch2唤醒G
          // 	fmt.Println(x)
          // }
              c = k.c
              if k.kind == caseSend {
                  c.sendq.dequeueSudoG(sglist)
              } else {
                  c.recvq.dequeueSudoG(sglist)
              }
          }
          sgnext = sglist.waitlink
          sglist.waitlink = nil
          releaseSudog(sglist)
          sglist = sgnext
      }
      
      if cas == nil {
          // We can wake up with gp.param == nil (so cas == nil)
          // when a channel involved in the select has been closed.
          // It is easiest to loop and re-run the operation;
          // we'll see that it's now closed.
          // Maybe some day we can signal the close explicitly,
          // but we'd have to distinguish close-on-reader from close-on-writer.
          // It's easiest not to duplicate the code and just recheck above.
          // We know that something closed, and things never un-close,
          // so we won't block again.
          goto loop
      }
      ...
    }
    

channel在go并发场景中的应用

  • 生成器

  • 服务化

  • 多路复合

  • select监听信道

  • 结束标志

  • Daisy-chain

  • 随机数生成器

  • 定时器

参考

深入理解go-channel和select的原理

Go语言并发的设计模式和应用场景

Understanding Channels