Back

Go Context和Channel

context、channel、select 原理

[TOC]

Context

一个接口,包含如下方法,主要用于实现主协程对子协程的控制,作用包括取消执行、设置超时时间、携带键值对等

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
type Context interface {
    // 获取到期时间,如果没有,ok则返回false
	Deadline() (deadline time.Time, ok bool)
	// 返回一个chan,表示取消信号,如果通道关闭则代表该 Context 已经被取消;如果返回的为 nil,则代表该 Context 是一个永远不会被取消的 Context。
    Done() <-chan struct{}
	// 返回该 Context 被取消的原因。如果只使用 Context 包的 Context 类型的话,那么只可能返回 Canceled (代表被明确取消)或者 DeadlineExceeded (因超时而取消)
    Err() error
    // 获取Context中的键值对
	Value(key interface{}) interface{}
}

一个demo,引用:通知多个子goroutine退出运行

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
 package main
 
 import (
 	"context"
 	"crypto/md5"
 	"fmt"
 	"io/ioutil"
 	"net/http"
 	"sync"
	"time"
)

type favContextKey string

func main() {
	wg := &sync.WaitGroup{}
	values := []string{"https://www.baidu.com/", "https://www.zhihu.com/"}
	ctx, cancel := context.WithCancel(context.Background())

	for _, url := range values {
		wg.Add(1)
		subCtx := context.WithValue(ctx, favContextKey("url"), url)
		go reqURL(subCtx, wg)
	}

	go func() {
		time.Sleep(time.Second * 3)
		cancel()
	}()

	wg.Wait()
	fmt.Println("exit main goroutine")
}

func reqURL(ctx context.Context, wg *sync.WaitGroup) {
	defer wg.Done()
	url, _ := ctx.Value(favContextKey("url")).(string)
	for {
		select {
        // 调用Done方法检测是否有父节点调用cancel方法通知子节点退出运行, chan被close时触发
		case <-ctx.Done():
			fmt.Printf("stop getting url:%s\n", url)
			return
		default:
			r, err := http.Get(url)
			if r.StatusCode == http.StatusOK && err == nil {
				body, _ := ioutil.ReadAll(r.Body)
				subCtx := context.WithValue(ctx, favContextKey("resp"), fmt.Sprintf("%s%x", url, md5.Sum(body)))
				wg.Add(1)
				go showResp(subCtx, wg)
			}
			r.Body.Close()
			//启动子goroutine是为了不阻塞当前goroutine,这里在实际场景中可以去执行其他逻辑,这里为了方便直接sleep一秒
			// doSometing()
			time.Sleep(time.Second * 1)
		}
	}
}

func showResp(ctx context.Context, wg *sync.WaitGroup) {
	defer wg.Done()
	for {
		select {
		case <-ctx.Done():
			fmt.Println("stop showing resp")
			return
		default:
			//子goroutine里一般会处理一些IO任务,如读写数据库或者rpc调用,这里为了方便直接把数据打印
			fmt.Println("printing: ", ctx.Value(favContextKey("resp")))
			time.Sleep(time.Second * 1)
		}
	}
}

emptyCtx

go提供了两个基本的context创建,emptyCtx是int类型的重新定义,emptyCtx没有过期时间,不能被取消,不能设置value,作用仅作为context树的根节点。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
var (
  background = new(emptyCtx)
  todo       = new(emptyCtx)
)

func Background() Context {
  return background
}

func TODO() Context {
  return todo
}

cancelCtx

通过根context,比如emptyCtx之后,调用withCancel()方法可以创建cancelCtx用于取消操作。

有两种方式可触发取消:

  1. 返回的CancelFunc被调用,此时会取消当前context和其所有的子context
  2. Done这个chan被close了,此时会取消当前context和其所有的子context
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
type CancelFunc func()

// 创建一个可被取消的context
func WithCancel(parent Context) (ctx Context, cancel CancelFunc) {
	if parent == nil {
		panic("cannot create context from nil parent")
	}
	c := newCancelCtx(parent)
    // 构建树形的cancel
	propagateCancel(parent, &c)
	return &c, func() { c.cancel(true, Canceled) } // canceled是一个error实现
}

func newCancelCtx(parent Context) cancelCtx {
	return cancelCtx{Context: parent}
}

// 主要是将child ctx与parent ctx绑定,放到parent的children属性中
func propagateCancel(parent Context, child canceler) {
	done := parent.Done()
	if done == nil {
		return // parent is never canceled
	}

	select {
	case <-done:
		// parent is already canceled
		child.cancel(false, parent.Err())
		return
	default:
	}

    // 获取parent的cancelCtx,ok是用来判断父context是不是CancelCtx类型的
	if p, ok := parentCancelCtx(parent); ok {
		p.mu.Lock()
		if p.err != nil {
			// parent已经被取消,触发child的取消
			child.cancel(false, p.err)
		} else {
            // parent没有被取消,把当前context作为parent的child
			if p.children == nil {
				p.children = make(map[canceler]struct{})
			}
			p.children[child] = struct{}{}
		}
		p.mu.Unlock()
	} else {
        // 表示parent的ctx不是一个cancelCtx,没有children属性,无法构建成树,只能通过parent的done来向下传播
		atomic.AddInt32(&goroutines, +1)
		go func() {
			select {
			case <-parent.Done():
				child.cancel(false, parent.Err())
			case <-child.Done():
			}
		}()
	}
}

可被取消的context实现了canceler接口,具体实现:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
type canceler interface {
	cancel(removeFromParent bool, err error)
	Done() <-chan struct{}
}

type cancelCtx struct {
	Context // 存储父context的指针

	mu       sync.Mutex
	done     chan struct{} // 作为取消信号的channel,子协程监听该channel判断是否要cancel
	children map[canceler]struct{} // 被关联的可被取消的context
	err      error                // 第一次取消时被设置
}

func (c *cancelCtx) Done() <-chan struct{} {
   c.mu.Lock()
   if c.done == nil {
      c.done = make(chan struct{})
   }
   d := c.done
   c.mu.Unlock()
   return d
}

func (c *cancelCtx) cancel(removeFromParent bool, err error) {
	// 在向下传播cancel时, 必须带上原始的error
    if err == nil {
		panic("context: internal error: missing cancel error")
	}
	c.mu.Lock()
	if c.err != nil {
		c.mu.Unlock()
		return // already canceled
	}
	c.err = err
	if c.done == nil {
		c.done = closedchan
	} else {
		close(c.done)
	}
	for child := range c.children {
		// 取消所有子context
		child.cancel(false, err)
	}
	c.children = nil
	c.mu.Unlock()

    // 从父context的children中移除当前context
	if removeFromParent {
		removeChild(c.Context, c)
	}
}

timerCtx

可超时自动取消的context,内部使用cancelCtx + timer实现,调用WithDeadline()方法可以创建timerCtx用于超时取消操作。

WithTimeout()方法WithDeadline()方法,效果是一样的,只是时间的含义不一样。

1
2
3
func WithTimeout(parent Context, timeout time.Duration) (Context, CancelFunc) {
	return WithDeadline(parent, time.Now().Add(timeout))
}

关于withTimeout()方法,返回的cancel函数,即使不主动调用,也不影响资源的最终释放,它到时间了也会自动调用,建议是提前主动调用,尽快释放,避免等待时间过长导致浪费。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
func WithDeadline(parent Context, deadline time.Time) (Context, CancelFunc) {
  if cur, ok := parent.Deadline(); ok && cur.Before(deadline) {
    // 如果parent可以更早结束, 那么返回一个包装parent的cancelCtx
    return WithCancel(parent)
  }
  c := &timerCtx{
    // 组合一个新的cancelCtx
    cancelCtx: newCancelCtx(parent),
    deadline:  deadline,
  }
  propagateCancel(parent, c) // 组织树形结构
  d := time.Until(deadline)
  if d <= 0 {
    // 如果时间已经到了, 直接触发取消
    c.cancel(true, DeadlineExceeded)
    return c, func() { c.cancel(true, Canceled) }
  }
  c.mu.Lock()
  defer c.mu.Unlock()
  if c.err == nil {
    // 新建定时器, 到期触发取消
    c.timer = time.AfterFunc(d, func() {
      c.cancel(true, DeadlineExceeded)
    })
  }
  // 返回值还有用于直接取消的CancelFunc
  return c, func() { c.cancel(true, Canceled) 
}
    
type timerCtx struct {
  cancelCtx
  timer *time.Timer // Under cancelCtx.mu.

  deadline time.Time
}

valueCtx

valueCtx内部仍然使用Context存储父Context的指针,并用interface{}存储键值;

如果当前valueCtx找不到需要的key,会沿着树向上一直查找直到根节点,类似链表的搜索;

使用WithValue创建时,会判断key是否实现Comparable接口。如果没有实现,会触发panic

key的类类型不应该是内置类型,以避免冲突。使用的时候应该自定义类型;

Channel

Channel的设计基于CSP模型。

CSP模型(Communicating Sequential Process,通信顺序进程),允许使用进程组来描述系统,独立运行,并且只通过消息传递的方式通信。

本质上就是,在使用协程执行函数时,不通过内存共享(会用到锁)的方式通信,而是通过Channel通信传递数据。

动画参考:https://go.xargin.com/docs/data_structure/channel/

基本

  • chan是引用类型,使用make关键字创建,未初始化时的零值是nil,如

    ch := make(chan string, 10),创建一个能处理string的缓冲区大小为10的channel,效果相当于异步队列,除非缓冲区用完,否则不会阻塞;

    ch := make(chan string),则创建了一个不存在缓冲区的channel,效果相当于同步阻塞队列,len永远返回0。

    即 假如没有接收者,同一个方法内,连续发送两次数据,第一次如果没有被接收的话,此时就阻塞了,轮不到第二次发送,但如果size = 1,第一次发送的数据就会进入buf数组,不阻塞,到了第二次发送才阻塞;

    var ch chan int表示创建了一个nil channel;

  • channel作为通道,负责在多个goroutine间传递数据,解决多线程下共享数据竞争问题。

  • len()方法获取buff中未被读取的数量,即qcount的值;

    cap()方法获取buff数组的长度

  • 带有 <- 的chan是有方向的,不带 <- 的chan是双向的,比如

1
  chan string        // 双向chan,可以发送和接收string  chan<- struct{}    // 只能发送struct到chan中  <-chan int         // 只能从chan中接收int
  • chan可以是任何类型的,比如可以是 chan<- 类型,<-总是尽量和左边的chan结合,比如
1
chan<- chan int    // 等价于 chan<- (chan int)chan<- <-chan int  // 等价于 chan<- (<-chan int)<-chan <-chan int  // 等价于 <-chan (<-chan int)chan (<-chan int)  // 等价于 chan (<-chan int)
  • 接收数据时可以有两个返回值,第一个是返回的元素,第二个是bool类型,表示是否成功地从chan中读取到一个值。如果是false,说明chan已经被close并且chan中没有缓存的数据,此时第一个元素是零值。所以,如果接收时第一个元素是零值,可能是sender真的发送了零值,也可能是closed并且没有元素导致的,所以最好通过第二个返回值来确定。
  • 双向chan可以赋值给单向chan,但反过来不可以;
  • 给一个nil channel发送数据,会造成永久阻塞,从一个nil channel接收数据,会造成永久阻塞;
  • 给一个已经关闭的channel发送数据,会引起panic;
1
2
3
4
5
6
7
8
// 因为是无缓冲区的,只有当存在receiver的时候才能send成功,否则就一直阻塞,所以当close的时候,就会panic
ch := make(chan int)
go func() { ch <- 1 }() // panic: send on closed channel
time.Sleep(time.Second)
go func() { close(ch) }()
time.Sleep(time.Second)
x, ok := <-ch
fmt.Println(x, ok)
  • 从一个已经关闭的channel接收数据,如果缓冲区为空,则返回一个零值;

  • 已关闭的channel再次关闭,会panic;

  • 对于一个不关闭的channel,在方法结束后,只要channel没有被引用,会被GC自动回收;

  • 关闭channel的原则:不要向已关闭的channel发送数据或者再次关闭,关闭的动作尽量在sender做,主要还是分场景:

    • 一个sender一个receiver的场景:在sender处关闭。

    • 一个sender多个recevier的场景:在sender处关闭。

    • 多个sender一个receiver的场景:增加一个传递关闭信号的 channel,receiver 通过信号 channel 下达关闭数据 channel 指令。senders 监听到关闭信号后,停止发送数据。

       1
       2
       3
       4
       5
       6
       7
       8
       9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      19
      20
      21
      22
      23
      24
      25
      26
      27
      28
      
      	dataCh := make(chan int, 100)
          stopCh := make(chan struct{})
      	// senders
          for i := 0; i < NumSenders; i++ {
              go func() {
                  for {
                      select {
                      case <- stopCh:
                          // 接收关闭信号退出
                          return
                      case dataCh <- rand.Intn(Max):
                      }
                  }
              }()
          }
          // the receiver
          go func() {
              for value := range dataCh {
                  if value == Max-1 {
                      fmt.Println("send stop signal to senders.")
                      // 直接关闭
                      close(stopCh)
                      return
                  }
      
                  fmt.Println(value)
              }
          }()
      
    • 多个sender多个receiver的场景:再增加一个中间的channel,用来接收标识关闭的数据,收到后直接close传递关闭的信号channel即可。

       1
       2
       3
       4
       5
       6
       7
       8
       9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      19
      20
      21
      22
      23
      24
      25
      26
      27
      28
      29
      30
      31
      32
      33
      34
      35
      36
      37
      38
      39
      40
      41
      42
      43
      44
      45
      46
      47
      48
      49
      50
      51
      52
      
      	dataCh := make(chan int, 100)
          stopCh := make(chan struct{})
      
          // 当使用select发送数据到toStop时,一定要有buffer,防止中间channel没准备好而错失关闭时机的问题
          toStop := make(chan string, 1)
          var stoppedBy string
      
          // 中间channel,用于接收标识关闭的数据
          go func() {
              stoppedBy = <-toStop
              close(stopCh)
          }()
      
          // senders
          for i := 0; i < NumSenders; i++ {
              go func(id string) {
                  for {
                      value := rand.Intn(Max)
                      // 发送者也可以关闭
                      if value == 0 {
                          toStop <- "sender#" + id
                          return
                      }
      
                      select {
                      case <- stopCh: // 真正的停止
                          return
                      case dataCh <- value:
                      }
                  }
              }(strconv.Itoa(i))
          }
      
          // receivers
          for i := 0; i < NumReceivers; i++ {
              go func(id string) {
                  for {
                      select {
                      case <- stopCh:  // 真正的停止
                          return
                      case value := <-dataCh:
                          // 接收者也能进行关闭
                          if value == Max-1 {
                              case toStop <- "receiver#" + id:
                              return
                          }
      
                          fmt.Println(value)
                      }
                  }
              }(strconv.Itoa(i))
          }
      
  • channel在关闭时会自动退出循环;

1
2
3
4
ch := make(chan int, 100)
for elem := range ch {
	fmt.println(elem)
}
  • 注意channel不提供跨goroutine的数据保护,如果多个channel传递一份数据的指针,使得每个goroutine可以操作同一份数据,也会出现并发安全问题;
nil empty full not full & empty closed
receive block block read value read value 返回未读的元素,读完后返回零值
send block write value block writed value panic
close panic closed,没有未读元素 closed,保留未读元素 closed,保留未读元素 panic

数据结构

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
type hchan struct {
	qcount   uint   // 已经接收但还没被取走的元素个数,即channel中的循环数组的元素个数
    dataqsiz uint   // channel中的循环数组的长度, ch:=make(chan int, 10), 就是这个10
	buf      unsafe.Pointer // channel中缓冲区数据指针,buf是一个循环数组,buf的总大小是elemsize的整数倍
	elemsize uint16 // 当前channel能够收发的元素大小
	closed   uint32
	elemtype *_type // 当前channel能够收发的元素类型
	sendx    uint   // 指向底层循环数组buf,表示当前可发送的元素位置的索引值,当sendx=dataqsiz时,会回到buf数组的起点,一旦接收新数据,指针就会加上elemsize,移向下个位置
	recvx    uint   // 指向底层循环数组buf,表示当前可接收的元素位置的索引值
	recvq    waitq  // 等待接收队列,存储当前channel因缓冲区空间不足而阻塞的goroutine列表,双向链表
	sendq    waitq  // 等待发送队列,存储当前channel因缓冲区空间不足而阻塞的goroutine列表,双向链表

	lock mutex  // 互斥锁,保证每个读channel或写channel的操作都是原子的,保护hchan和sudog上的字段。
    // 持有lock时,禁止更改另一个G的状态(比如不要把状态改成ready),防止死锁
}

// 双向链表
// sudog表示goroutine,是对goroutine的一层封装,代表一个在等待队列中的G
// 一个G可以出现在多个等待队列上,因此一个G可以有多个sudog
type waitq struct {
	first *sudog
	last  *sudog
}

type sudog struct {
    g    *g
    next *sudog
    prev *sudog
    elem unsafe.Pointer
}

初始化

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
func makechan(t *chantype, size int) *hchan {
    ...
    elem := t.elem
    // 略去检查代码,检查数据项大小是否超过64KB,是否有错误的内存对齐,缓冲区大小是否溢出
    ...

    var c *hchan
    switch {
    case mem == 0:
      // chan的size或者元素的size是0,不必创建buf
      c = (*hchan)(mallocgc(hchanSize, nil, true))
      c.buf = c.raceaddr()
    case elem.ptrdata == 0:
      // 元素不是指针,分配一块连续的内存给hchan数据结构和buf
      c = (*hchan)(mallocgc(hchanSize+mem, nil, true))
            // hchan数据结构后面紧接着就是buf
      c.buf = add(unsafe.Pointer(c), hchanSize)
    default:
      // 元素包含指针,那么单独分配buf
      c = new(hchan)
      c.buf = mallocgc(mem, elem, true)
    }
  
    // 元素大小、类型、容量都记录下来
    c.elemsize = uint16(elem.size)
    c.elemtype = elem
    c.dataqsiz = uint(size)
    lockInit(&c.lock, lockRankHchan)

    return c
  }

发送数据

使用ch <- "test"发送数据,最终会调用chansend()函数发送数据,该函数设置了阻塞参数为true;

  1. 如果chan是nil,则把发送者的goroutine park(阻塞休眠),此时发送者将被永久阻塞;

  2. 如果chan没有被close,但是chan满了,则直接返回false,但是由于阻塞参数为true,这部分不会被执行;

  3. 上锁,保证线程安全,再次检查chan是否被close,如果被close,再往里发数据会触发 解锁,panic

  4. 同步发送 - 优先发送给等待接收的G

    如果没被close,当recvq存在等待的接收者时,通过send()函数,取出第一个等待的goroutine,直接发送数据,不需要先放到buf中;

    send()函数将因为等待数据的接收而阻塞的goroutine的状态从Gwaiting或者Gscanwaiting改为Grunnable,把goroutine绑定到P的LRQ中,等待下一轮调度时会立即执行这个等待发送数据的goroutine;

  5. 异步发送 - 其次是发送到buf区

    当recvq中没有等待的接收者,且buf区存在空余空间时,会使用chanbuf()函数获取sendx索引值,计算出下一个可以存储数据的位置,然后调用typedmemmove()函数将要发送的数据拷贝到buff区,增加sendx索引和qcount计数器,完成之后解锁,返回成功;

  6. 阻塞发送 - 最后才保存在待发送队列,阻塞(阻塞只发生在这里,此时G和M分离)

    当recvq中没有等待的接收者,且buf区已满或不存在buf区时,会先调用getg()函数获取正在发送者的goroutine,执行acquireSudog()函数获取sudoG对象,设置此次阻塞发送的相关信息(如发送的channel、是否在select控制结构中和待发送数据的内存地址、发送数据的goroutine)

    然后将该sudoG对象加入sendq队列,调用goparkunlock()函数让当前发送者的goroutine进入等待状态,表示当前goroutine正在等待其他goroutine从channel中接收数据,等待调度器唤醒;

    此时len()返回值为0,数据的发送是阻塞在方法中的。

    调度器唤醒后,将一些属性值设置为零,并释放sudog对象,表示向channel发送数据结束;

channel发送数据时涉及两次goroutine的调度

  1. 当接收队列里存在sudoG可以直接发送数据时,执行goready()函数,将G从Gwaiting或GScanwaiting转为Grunnable,等待下次调度触发,交由M执行;
  2. 当没有等待接收数据的G,并且没有缓冲区,或者缓冲区已满时,执行gopark()函数挂起当前G,将G阻塞,此时状态为Gwaiting,让出CPU等待调度器调度;
  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
// ep指的是用来发送数据的内存指针,数据类型与hchan中的类型一致
// 返回值表示带发送的数据是否 send 成功,即是否被接受,比如进buff或者被接收者接收;
// ch <- [val] 时,block=true; select时,block=false
func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
	// 如果chan是nil,则把发送者的goroutine park(阻塞休眠),此时发送者将被永久阻塞
    if c == nil {
		if !block {
			return false
		}
		gopark(nil, nil, waitReasonChanSendNilChan, traceEvGoStop, 2)
		throw("unreachable")
	}

	if raceenabled {
		racereadpc(c.raceaddr(), callerpc, funcPC(chansend))
	}

    // 下面代码进针对select场景
    // 当chan不为null,且没被close,full方法判断chan发送是否阻塞,是则直接返回true
    // full方法有两种情况判断是否可发送
    // 1. 如果hchan.dataqsiz=0,说明是阻塞队列,如果此时hchan.recvq.first==nil,说明没有接收者,发送阻塞
    // 2. 比较hchan.qcount是否等于hchan.dataqsiz,如果是说明chan已满,发送阻塞
	if !block && c.closed == 0 && full(c) {
		return false
	}
    
    // 上锁
	lock(&c.lock)
	// 如果chan被关闭,再往里发送数据就会解锁,然后panic
	if c.closed != 0 {
		unlock(&c.lock)
		panic(plainError("send on closed channel"))
	}
	// 如果chan没关闭,获取接收者等待队列中的第一个G开始发送数据
	if sg := c.recvq.dequeue(); sg != nil {
		// G存在,调用send函数,send函数主要完成两件事
        // 1. 调用sendDirect()函数将数据拷贝到接收变量的内存地址上
        // 2. 调用goready()函数将等待接收的阻塞G的状态从Gwaiting或者Gscanwaiting改为Grunnable,把G绑定到P的LRQ中,下一轮调度时会唤醒这个等待接收数据的G立即执行。
		send(c, sg, ep, func() { unlock(&c.lock) }, 3)
		return true
	}
	// 当recvq中没有等待接收数据的G,且chan的缓冲区还有空间时
	if c.qcount < c.dataqsiz {
        // 调用chanbuf获取sendx索引的元素的指针,;
		qp := chanbuf(c, c.sendx)
		if raceenabled {
			raceacquire(qp)
			racerelease(qp)
		}
        // 调用typedmemmove()将要发送的数据拷贝到缓冲区buf
		typedmemmove(c.elemtype, qp, ep)
        // 然后增加sendx索引和qcount计数器的值
		c.sendx++
		if c.sendx == c.dataqsiz {
            // 因为buf缓冲区是环形,如果索引到了队尾,则置0重新回到队头
			c.sendx = 0
		}
		c.qcount++
        // 完成后就解锁,返回成功
		unlock(&c.lock)
		return true
	}

    // 能过到这边,说明没有等待接收数据的G,并且没有缓冲区,或者缓冲区已满,此时进入阻塞发送
	if !block {
		unlock(&c.lock)
		return false
	}

	// 获取当前goroutine
	gp := getg()
    // 获取一个sudo G;acquireSudog()方法主要是获取可复用的sudoG对象,会优先从本地缓存获取,获取不到就会从全局缓存中获取,追加到本地缓存,如果全局缓存也没有,则新创建一个sudoG
	mysg := acquireSudog()
	mysg.releasetime = 0
	if t0 != 0 {
		mysg.releasetime = -1
	}
	// 为sudo G设置好要发送的数据和状态,比如发送的Channel、是否在select中和待发送的数据的内存地址等
	mysg.elem = ep
	mysg.waitlink = nil
	mysg.g = gp
	mysg.isSelect = false
	mysg.c = c
	gp.waiting = mysg
	gp.param = nil
    // 将sudo G加入待发送队列
	c.sendq.enqueue(mysg)
    // 调用gopark方法挂起当前goroutine,状态为waitReasonChanSend,阻塞等待channel
	gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanSend, traceEvGoBlockSend, 2)
	// 确保发送的值保存活动状态,直到接收者将其复制出来。因为sudoG具有指向堆栈对象的指针,但其不能作为GC时的root对象。发送的数据是分配在堆上的,避免被GC。
	KeepAlive(ep)

	// 当goroutine被唤醒后,解除阻塞状态,完成channel阻塞数据的发送
	if mysg != gp.waiting {
		throw("G waiting list is corrupted")
	}
	gp.waiting = nil
	gp.activeStackChans = false
	if gp.param == nil {
		if c.closed == 0 {
			throw("chansend: spurious wakeup")
		}
		panic(plainError("send on closed channel"))
	}
	gp.param = nil
	if mysg.releasetime > 0 {
		blockevent(mysg.releasetime-t0, 2)
	}
	mysg.c = nil
    // 发送完成之后,解除channel的绑定,重置sudoG状态,释放sudoG,释放时,如果本地缓存已满,会转移一部分到全局缓存,否则放到本地缓存等待被复用
	releaseSudog(mysg)
	return true
}

接收数据

使用 str <- ch 或 str, ok <- ch (ok用于判断ch是否关闭,如果没有ok,可能会无法分辨str接收到的零值是发送者发的还是ch关闭)接收数据,会转化为调用chanrecv1和chanrecv2函数,但最终会调用chanrecv函数接收数据。chanrecv1和chanrecv2函数都是设置阻塞参数为true。

  1. 如果chan是nil,则把接收者的goroutine park(阻塞休眠),接收者被永久阻塞;

  2. 不上锁检查 buf 区大小:如果chan的buf区大小为0 或者 没有数据可接收,检查是否被关闭,被关闭则返回;如果没被关闭,则再次检查buf区大小是否为0 或者 没有数据可接收,如果是,则清除ep指针中的数据并返回selected为true,received为false;

    这里两次empty检查,因为第一次检查,chan可能还没关闭,但是第二次检查时关闭了,由于可能在两次检查之间有待接收的数据达到了,所以需要两次empty检查;

  3. 上锁检查buf区大小:上锁,如果chan已经被close,且buf区没有数据,清除ep指针中的数据,解锁,返回selected为true,received为false;

  4. 同步接收 - 如果无buf,消费发送等待队列中G的数据,如果buf满,先拿buf区的,发送的再加入

    当chan的sendq队列存在等待状态的goroutine时(能拿到就说明要不就是buf区为0,要不就是buf区已满)

    如果是无buf区的chan,直接使用recv()函数从阻塞的发送者中获取数据;

    如果是有buf区的chan,说明此时buf区已满,则先从buf区中获取可接收的数据(从buf区中copy到接收者的内存),然后从sendq队列的队首中读取待发送的数据,加入到buf区中(将发送者的数据copy到buf区,替换刚刚buf区copy出去的位置),更新可接收和可发送的下标chan.recvx和sendx的值;

    最后调用goready()函数将等待发送数据而阻塞gorouotine的状态从Gwaiting 或者 Gscanwaiting 改变成 Grunnable,把goroutine绑定到P的LRQ中,等待下一轮调度时立即释放这个等待发送数据的goroutine;

  5. 异步接收 - 其次是消费buf区中的数据

    当channel的sendq队列没有等待状态的goroutine,且buf区存在数据时,从channel的buf区中的recvx的索引位置接收数据,如果接收数据的内存地址不为空,会直接将缓冲区里的数据拷贝到内存中,清除buf区中的数据,递增recvx,递减qcount,完成数据接收;

    这个和chansend共用一把锁,所以不会有并发问题;

  6. 阻塞接收 - 最后才是保存在接收等待队列,阻塞(阻塞只发生在这里,此时G和M分离)

    当channel的sendq队列没有等待状态的goroutine,且buf区不存在数据时,执行acquireSudog()函数获取sudoG对象,设置此次阻塞发送的相关信息(如发送的channel、是否在select控制结构中和待发送数据的内存地址、发送数据的goroutine)

    然后将该sudoG对象加入待发送recvq队列,调用goparkunlock()函数让当前接收者的goroutine进入等待状态,表示当前goroutine正在等待其他goroutine从channel中发送数据,等待调度器唤醒;

    此时方法会阻塞在ch的接收中,len()返回值为0;

    goroutine被唤醒后,chan完成阻塞数据的接收,接收完成后进行基本的参数检查,解除chan的绑定,释放sudoG,表示接收数据完成;

channel 接收过程中包含 2 次有关 goroutine 调度过程

  1. 当发送队列中存在 sudoG 时,调用goready(),G 的状态从 Gwaiting 或者 Gscanwaiting 改变成 Grunnable,等待下次调度便立即运行;
  2. 当 buf 区为空,且没有发送者时,调用 gopark()挂起当前G,此时状态为Gwaiting,让出 cpu 的使用权并等待调度器的调度;
  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
// ep指的是用来接收数据的内存指针,数据类型与hchan中的类型一致
// [val] <- ch 时,block=true; select时,block=false
func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
	if debugChan {
		print("chanrecv: chan=", c, "\n")
	}
	// 如果chan是nil,接收者会被阻塞,gopark会引起waitReasonChanReceiveNilChan原因的休眠,并抛出unreachable的错误
	if c == nil {
		if !block {
			return
		}
		gopark(nil, nil, waitReasonChanReceiveNilChan, traceEvGoStop, 2)
		throw("unreachable")
	}
    // 这段代码仅针对select的场景
	// 当chan不为nil,在没有获取锁的情况下,检查chan的buf区大小和是否存在可接收数据
    // empty方法是原子检查,检查chan.dataqsiz、chan.qcount是否为0,发送队列是否为空
	if !block && empty(c) {
		if atomic.Load(&c.closed) == 0 {
			return
		}
        // 这里两次empty检查,因为第一次检查,chan可能还没关闭,但是第二次检查时关闭了,由于可能在两次检查时有待接收的数据达到了,所以需要两次empty检查
		if empty(c) {
			if raceenabled {
				raceacquire(c.raceaddr())
			}
            // 如果chan的buf区大小和是否存在可接收数据,此时会清除ep指针中的数据
			if ep != nil {
				typedmemclr(c.elemtype, ep)
			}
			return true, false
		}
	}
	
	var t0 int64
	if blockprofilerate > 0 {
		t0 = cputicks()
	}
    // 获取锁后,再检查一遍
	lock(&c.lock)
    // 如果chan已经关闭且buf区不存在数据了,则清理ep指针中的数据并返回
    // 这里也是从已经关闭的chan中读数据,读出来的是该类型零值的原因
	if c.closed != 0 && c.qcount == 0 {
		if raceenabled {
			raceacquire(c.raceaddr())
		}
		unlock(&c.lock)
		if ep != nil {
			typedmemclr(c.elemtype, ep)
		}
		return true, false
	}

    // 从发送队列队首中找到等待发送的goroutine(能拿到就说明要不就是buf区为0,要不就是buf区已满)
    // 如果buf区大小为0,则直接接收数据;
    // 否则,说明buf区已满,先从buf区中获取要发送的数据,再将sender的数据加入到buf区中,更新可接收和可发送的下标chan.recvx和sendx的值
	if sg := c.sendq.dequeue(); sg != nil {
		recv(c, sg, ep, func() { unlock(&c.lock) }, 3)
		return true, true
	}

    // 当chan的buf区存在数据时,直接从buf区中获取数据,进行接收,更新接收数据的下标值,解锁
	if c.qcount > 0 {
		qp := chanbuf(c, c.recvx)
		if raceenabled {
			raceacquire(qp)
			racerelease(qp)
		}
		if ep != nil {
			typedmemmove(c.elemtype, ep, qp)
		}
		typedmemclr(c.elemtype, qp)
		c.recvx++
		if c.recvx == c.dataqsiz {
			c.recvx = 0
		}
		c.qcount--
		unlock(&c.lock)
		return true, true
	}

	if !block {
		unlock(&c.lock)
		return false, false
	}

	// 到了这里,说明sendq里没有待发送的goroutine,且buf区也没有数据
	gp := getg()
	mysg := acquireSudog()
	mysg.releasetime = 0
	if t0 != 0 {
		mysg.releasetime = -1
	}
	mysg.elem = ep
	mysg.waitlink = nil
	gp.waiting = mysg
	mysg.g = gp
	mysg.isSelect = false
	mysg.c = c
	gp.param = nil
    // 设置好待接收的sudoG后,加入待发送的等待队列
	c.recvq.enqueue(mysg)
    // 挂起当前goroutine,状态设置为waitReasonChanReceive,阻塞等待chan
	gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanReceive, traceEvGoBlockRecv, 2)
	// goroutine被唤醒后,完成chan阻塞数据的接收,解除chan的绑定释放sudoG
	if mysg != gp.waiting {
		throw("G waiting list is corrupted")
	}
	gp.waiting = nil
	gp.activeStackChans = false
	if mysg.releasetime > 0 {
		blockevent(mysg.releasetime-t0, 2)
	}
	closed := gp.param == nil
	gp.param = nil
	mysg.c = nil
	releaseSudog(mysg)
	return true, !closed
}


// 这里sg指的是等待发送队列中的G,ep指其携带的数据
func recv(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) {
	if c.dataqsiz == 0 {
		if ep != nil {
			// 从 sender 里面拷贝数据
			recvDirect(c.elemtype, sg, ep)
		}
	} else {
	    // 这里对应 buf 满的情况
		qp := chanbuf(c, c.recvx)
		// 将数据从 buf 中拷贝到接收者内存地址中
		if ep != nil {
			typedmemmove(c.elemtype, ep, qp)
		}
		// 将数据从 sender 中拷贝到 buf 中
		typedmemmove(c.elemtype, qp, sg.elem)
		c.recvx++
		if c.recvx == c.dataqsiz {
			c.recvx = 0
		}
		c.sendx = c.recvx // c.sendx = (c.sendx+1) % c.dataqsiz
	}
	sg.elem = nil
	gp := sg.g
	unlockf()
	gp.param = unsafe.Pointer(sg)
	sg.success = true
	if sg.releasetime != 0 {
		sg.releasetime = cputicks()
	}
	goready(gp, skip+1)
}

关闭

  1. 如果 chan 为 nil,close 会 panic;

  2. 上锁:

    如果 chan 已经 closed,再次 close 也会 panic;

    否则的话,如果 chan 不为 nil,chan 也没有 closed,设置chan的标记为closed;

  3. 优先释放所有的接收者

    将接收者等待队列中的sudoG对象加入到待清除队列glist中,这里会优先回收接收者,这样即使从close中的chan读取数据,也不会panic,最多读到默认值;

    这样第6步执行的时候,才会先执行接收者,接收后面发送者的数据(接收buff数组里的数据,因为sender里的会被panic掉),否则发送者发送的数据无法被先接收。

  4. 其次是释放所有发送者: 将发送者等待队列中的sudoG对象加入到待清除队列glist中,这里可能会发生panic,因为往一个close的chan中发送数据会panic;

  5. 解锁

  6. 进行最后的调度,遍历glist中的sudoG,调用goready()触发调度,将每个goroutine状态从 Gwaiting 转为 Grunnable状态,等待调度器调度;

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
func closechan(c *hchan) {
	if c == nil {
		panic(plainError("close of nil channel"))
	}

	lock(&c.lock)
	if c.closed != 0 {
		unlock(&c.lock)
		panic(plainError("close of closed channel"))
	}
	c.closed = 1
	var glist gList
	// 释放所有接收者:将所有接收者的sudoG等待队列加入到待清除的队列glist中
	for {
		sg := c.recvq.dequeue()
		if sg == nil {
			break
		}
        // 销毁资源
		if sg.elem != nil {
			typedmemclr(c.elemtype, sg.elem)
			sg.elem = nil
		}
		if sg.releasetime != 0 {
			sg.releasetime = cputicks()
		}
		gp := sg.g
		gp.param = nil
		if raceenabled {
			raceacquireg(gp, c.raceaddr())
		}
		glist.push(gp)
	}
	// 释放所有发送者:将所有发送者的sudoG等待队列加入到待清除的队列glist中
    // 如果发送者队列存在发送者,那这些发送者所在的goroutine会产生panic
	for {
		sg := c.sendq.dequeue()
		if sg == nil {
			break
		}
        // 销毁资源
		sg.elem = nil
		if sg.releasetime != 0 {
			sg.releasetime = cputicks()
		}
		gp := sg.g
		gp.param = nil
		if raceenabled {
			raceacquireg(gp, c.raceaddr())
		}
		glist.push(gp)
	}
	unlock(&c.lock)

	// 为所有被阻塞的 goroutine 调用 goready 触发调度。将所有 glist 中的 goroutine 状态从 _Gwaiting 设置为 _Grunnable 状态,等待调度器的调度
	for !glist.empty() {
		gp := glist.pop()
		gp.schedlink = 0
		goready(gp, 3)
	}
}

应用场景

  • 实现生产者 - 消费组模型,数据传递,比如worker池的实现

     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    
    func consumer(taskChan <-chan int) {
        for i := 0; i < 5; i++ {
            go func(id int) {
                for {
                    task := <- taskChan
                    time.Sleep(time.Second) // 模拟耗时
                }
            }(i)
        }
    }
    
    func main() {
        taskCh := make(chan int, 100)
        go consumer(taskCh)
        // 生产者
        for i := 0; i < 10; i++ {
            taskCh <- i
        }
        // wait...
    }
    
  • 信号通知:利用 如果chan为空,那receiver接收数据的时候就会阻塞等待,直到chan被关闭或有新数据进来 的特点,将一个协程将信号(closing、closed、data ready等)传递给另一个或者另一组协程,比如 wait/notify的模式。

  • 协程池,把要操作的逻辑封装成task,通过chan传输实现协程复用

  • 任务编排:让一组协程按照一定的顺序并发或串行执行,比如实现waitGroup的功能

  • 控制并发量,可以配合WaitGroup进行控制goroutine的数量

     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    
    func main() {
        ch := make(chan int, 3)
        for i := 0; i < 10; i++ {
    // 这个放在外层和放在里层的效果不同,都可以控制并发量,但是前者会阻塞for循环,后者不会
            ch <- 1   // 放满三个后就会阻塞循环,此时最多存在3个goroutine
          go func(k int) {
              fmt.Println(k) // do something...
              time.Sleep(time.Second)
              // 防止泄露
              defer func() {
                  <- ch
              }()
          }(i)
        }
    }
    
    // 配合WaitGroup控制goroutine的数量
    func main() {
        ch := make(chan int, 3)
        wg := sync.WaitGroup{}
        for i := 0; i < 10; i++ {
            ch <- 1 // ch如果放在里层就达不到控制goroutine的效果了
            wg.Add(1)
          go func(k int) {
                defer wg.Done()
                // ch<- 1, ch如果放在这,那就只阻塞goroutine里的逻辑,goroutine还是会创建多个
              fmt.Println(k) // do something...
              time.Sleep(time.Second)
                // 防止泄露
              defer func() {
                  <- ch
              }()
          }(i)
        }
        wg.Wait()
    }
    
  • 任务定时

     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    
    func worker() {
        ticker := time.Tick(1 * time.Second)
        for {
            select {
            case <- ticker:
                // 执行定时任务
                fmt.Println("执行 1s 定时任务")
            }
        }
    }
    // 或者
    func worker() {
        for {
            select {
              case <-time.After(100 * time.Millisecond):
              case <-s.stopc:
                  return false
          }
        }
    }
    
  • 实现互斥锁的机制,比如,容量为 1 的chan,放入chan的元素代表锁,谁先取得这个元素,就代表谁先获取了锁

     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    
    type Locker struct {
        ch chan int
    }
    
    func NewLocker() *Locker {
        locker := &Locker{ch: make(chan int, 1)}
        locker.ch <- 1
        return locker
    }
    
    func (locker *Locker) Lock() {
        <- locker.ch
    }
    
    func (locker *Locker) UnLock() {
        select {
            case locker.ch <- struct{}{}: 
          default: 
              panic(" unlock of unlocked mutex") 
        }
    }
    

共享资源的并发访问使用传统并发原语;

复杂的任务编排和消息传递使用 Channel;

消息通知机制使用 Channel,除非只想 signal 一个 goroutine,才使用 Cond;

简单等待所有任务的完成用 WaitGroup,也有 Channel 的推崇者用 Channel,都可以;

需要和 Select 语句结合,使用 Channel;需要和超时配合时,使用 Channel 和 Context。

注意点:使用chan要注意panic和goroutine泄露,另外,只要一个 chan 还有未读的数据,即使把它 close 掉,你还是可以继续把这些未读的数据消费完,之后才是读取零值数据。

在使用chan和select配合时要注意会出现goroutine泄漏的情况:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
func process(timeout time.Duration) bool {
    ch := make(chan bool)

    go func() {
        // 模拟处理耗时的业务
        time.Sleep((timeout + time.Second))
        ch <- true // block
        fmt.Println("exit goroutine")
    }()
    // 如果上面的协程任务处理的时间过长,触发下面select的超时机制,此时process函数返回,之后当上面的协程任务执行完之后,由于process已经执行完,下面result接收chan的值被回收,所以没有接收者,导致上面的协程任务一直卡在 ch <- true,进而导致goroutine泄漏。解决方案就是使用容量为1的ch即可。
    select {
    case result := <-ch:
        return result
    case <-time.After(timeout):
        return false
    }
}

select

结构

select在runtime中不存在结构体表示,但是case倒是有

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
const (
    // scase.kind,到时select就是轮询判断这些类型的
    // send 或者 recv 发生在一个 nil channel 上,就有可能出现这种情况
    caseNil = iota
    caseRecv
    caseSend
    caseDefault
)

// select 中每一个 case 的数据结构定义
type scase struct {
	elem unsafe.Pointer // 接收 或 发送数据的变量地址
	c    *hchan         // 存储正在使用的chan
	kind uint16         // case的种类

	releasetime int64
	pc          uintptr // return pc (for race detector / msan)
}

基本

  • 非阻塞收发:当chan中存在可接收数据,直接处理那个chan,否则执行default语句
1
2
3
4
5
6
7
ch := make(chan int)
    select {
    case i := <-ch:
        println(i)
    default:
        println("default")
    }
  • 随机执行:select遇到多个case就绪,会进行随机选择
1
2
3
4
5
6
7
8
ch1 := make(chan int)
ch2 := make(chan int)
    select {
    case j := <-ch1:
        println(j)
    case i := <-ch2:
        println(i)
    }

编译器会对select中的case进行优化,总共有四种情况:

  1. 不包含任何case,即空select,此时会阻塞当前的goroutine

  2. 只包含一个case,此时select会被优化成 if ,当chan没有数据可接收时,就会把阻塞当前goroutine,直到有数据到来;如果chan是nil,就会永远阻塞当前goroutine

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
select {
case v, ok <-ch:
    // ...    
}
// 会被优化成
if ch == nil {
    block()
}
v, ok := <-ch
// ...
  1. 存在两个case,其中一个是default:

    • 发送,这种情况下,发送是不阻塞的:
     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    11
    12
    13
    
    ch := make(chan int, 1) // 一定要1及以上,才先会走case,如果是0会死锁,直接走default,
    select {
    case ch <- i:
        // ...
    default:
        // ...
    }
    // 底层调用的是chansend(c, elem, false, getcallerpc()),这里的阻塞参数是false, 表示这次发送不会阻塞
    if selectnbsend(ch, i) {
        // ...
    } else {
        // ...
    }
    
    • 接收,这种情况下,chan有值就走case,否则走default
     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    11
    12
    
      select {
      case v <- ch: // case v, received <- ch:
          // ...
      default:
          // ...
      }
    
      if selectnbrecv(&v, ch) { // if selectnbrecv2(&v, &received, ch) {
          // ...
      } else {
          // ...
      }
    
    • 通用的select条件:比如select里包含多个case,会编译成通过runtime的selectgo方法处理case,selectgo会返回 case的序号 还有 是否被接收的标识,然后被编译成多个if,用于判断选中哪个case。
     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    
      selv := [3]scase{}
      order := [6]uint16
      for i, cas := range cases {
          c := scase{}
          c.kind = ...
          c.elem = ...
          c.c = ...
      }
      chosen, revcOK := selectgo(selv, order, 3)
      if chosen == 0 {
          // ...
          break
      }
      if chosen == 1 {
          // ...
          break
      }
      if chosen == 2 {
          // ...
          break
      }
    

selectgo的流程

  1. 获取case数组,随机打乱,确定打乱后的轮询顺序数组pollorder和加锁顺序数组lockorder,数组里存放的元素是chan

  2. 按加锁顺序数组,调用chan的锁,依次进行锁定

  3. 进入主循环,遍历 轮询顺序数组pollorder

第一阶段,查找是否已经存在准备就绪的chan(此时的chan可以执行收发操作)此时需要处理四种类型的case:

  1. 当case不包含chan时,直接跳过;

  2. 当case会从chan中接收数据时:

  • 如果当前chan的sendq队列上有等待的goroutine,就会跳到 recv标签,如果没有buf区,则从sendq队列上获取数据,否则,从chan的buf区读取数据后,将sendq队列中等待的goroutine中的数据放入到buf区中相同的位置;
  • 如果当前chan的buf区不为空,就跳到bufrecv标签,从chan的buf区中获取数据
  • 如果当前chan已经被关闭,就会跳到 rclose标签 做一些清除的收尾工作;
  1. 当case会从chan中发送数据时:
  • 如果当前chan已经被关闭,会直接跳到 sclose标签,触发panic;
  • 如果当前chan的recvq队列上有等待的goroutine,就跳到 send标签 向chan发送数据;
  • 如果当前chan的缓冲区存在空闲位置,就会将等待发送的数据存入缓冲区中,因为select相当于有接收者了,不会出现发送阻塞的情况;
  1. 当case是default时,表示前面的所有case都没有被执行,此时会解锁所有的chan并返回(意味着当前select结构的收发都是非阻塞的),直接执行default内容;

第一阶段只是查找所有case中是否有可以立即被处理的chan,无论是数据是在等待的goroutine上,还是buf区中,只要存在数据满足条件就会立即处理,然后返回;如果不能立刻找到活跃的chan,就会进入下一循环;

第二阶段,将当前goroutine加入到chan对应的收发队列上并等待其他goroutine的唤醒:

  • 将当前goroutine,包装成sudogo,遍历case,加入到case的chan的sendq队列或者recvq队列中(同时,这个sudog会关联当前case的chan,然后将这些sudog组成链表,挂在当前goroutine下,用于唤醒之后的查找)

  • 调用gopark函数挂起当前goroutine,等待被调度器唤醒;

第三阶段,当前goroutine被唤醒后,找到满足条件的chan并进行处理:

  • 等到select对应的chan准备好后,当前goroutine会被调度器唤醒,被唤醒后,获取当前goroutine的sudog,依次对比所有case里chan对应的sudog结构,找到被唤醒的case,并释放其他未被使用的sudog结构;
  • 由于当前的select结构已经被挑选了其中一个case执行,剩下的case中没有被用到的sudog会被直接忽略并释放掉,为了不影响chan的正常使用,还需要将这些废弃的sudog从chan中出队;
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
func selectgo(cas0 *scase, order0 *uint16, ncases int) (int, bool) {
    cas1 := (*[1 << 16]scase)(unsafe.Pointer(cas0))
    order1 := (*[1 << 17]uint16)(unsafe.Pointer(order0))

    scases := cas1[:ncases:ncases]
    // 轮询顺序数组
    pollorder := order1[:ncases:ncases]
    // 加锁顺序数组
    lockorder := order1[ncases:][:ncases:ncases]

    for i := range scases {
        cas := &scases[i]
        if cas.c == nil && cas.kind != caseDefault {
            *cas = scase{}
        }
    }

    // 根据chan的地址排序
    for i := 1; i < ncases; i++ {
        // 随机轮询,避免chan饿死
        j := fastrandn(uint32(i + 1))
        pollorder[i] = pollorder[j]
        pollorder[j] = uint16(i)
    }
    // 按照之前生成的加锁顺序锁定 select 语句中包含所有的 Channel
    sellock(scases, lockorder)

    // ...后面太长就不贴了
}

Timer

https://www.cyhone.com/articles/analysis-of-golang-timer/

时间轮

概念理解:一张图理解Kafka时间轮

手把手教你如何用golang实现一个timewheel时间轮

Go语言中时间轮的实现

总结:

通过DelayQueue(优先级队列实现,队列里的每个元素,都是某一个具体时间的list) + 环形数组(数组的每个元素是个list,索引代表时间格)

DelayQueue会根据环形数组中的每个元素进行排序;

添加任务时,判断任务执行时间,加入环形数组中,对应的环形数组的元素(list),加入DelayQueue中。,

Built with Hugo
Theme Stack designed by Jimmy