Go-Channel

Channel是Go的核心数据结构和Goroutine之间的通信方式,支持Go的高性能并发编程模型。

设计原理

不通过共享内存的方式通信,而是通过通信的方式共享内存,采用的并发模式为:CSP(通信顺序进程)。Goroutine 和 Channel 分别对应 CSP 中的实体和传递信息的媒介,Goroutine 之间会通过 Channel 传递数据。

怎么理解“不通过共享内存的方式通信,而是通过通信的方式共享内存”这句话呢?

前半句指通过 sync 包里的一些组件进行并发编程;后半句指使用 channel 进行并发编程。实际上,channel 的底层就是通过 mutex 来控制并发的。只是 channel 是更高一层次的并发编程原语,封装了更多的功能。

上图中的两个 Goroutine,一个会向 Channel 中发送数据,另一个会从 Channel 中接收数据,它们两者能够独立运行并不存在直接关联,但是能通过 Channel 间接完成通信

FIFO

Channel的收发操作遵循“先进先出”:

  • 先从 Channel 读取数据的 Goroutine 会先接收到数据;
  • 先向 Channel 发送数据的 Goroutine 会得到先发送数据的权利。

无锁管道

锁作为一种并发控制技术,通常分为:乐观锁和悲观锁。无锁队列对应使用乐观锁的队列。

  • 乐观锁:本质是基于验证的协议,使用原子指令CAS在多线程之间同步数据。无锁队列的实现也依赖这一原子指令。

Channel在运行时的内部表示是runtime.hchan,包含互斥锁,因此是一个用于同步和通信的有锁队列。

然而,锁导致的休眠和唤醒将带来额外的上下文切换;如果临界区过大,加锁解锁导致的额外开销,将成为性能瓶颈。Go社区曾提出了无锁Channel实现方案,分为以下3种类型:

  1. 同步Channel(不带缓冲):,发送方直接将数据交给Handoff接收方;
    • 同步模式下,当且仅当发送方和接收方都ready的情况下,数据才能传输(本质上是内存拷贝);否则,任意一方先行进行发送或接收操作,都会被挂起,等待另一方的出现才能被唤醒。
  2. 异步Channel(带缓冲):基于环形缓存的传统生产者消费者模型;
    • 缓冲槽有剩余容量的情况下,发送和接收操作都可以顺利进行。否则,操作的一方(如写入)同样会被挂起,直到出现相反操作(如接收)才会被唤醒。
  3. chan struct{}类型的异步Channel:struct{}类型不占用内存空间,无需实现缓冲区和直接发送Handoff的语义。

这个提案的目的也不是实现完全无锁的队列,只是在一些关键路径上通过无锁提升 Channel 的性能。

数据结构

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
type hchan struct {
qcount uint // Channel中的元素个数
dataqsiz uint // Channel中的循环队列长度
buf unsafe.Pointer // Channel的缓冲区数据指针
elemsize uint16
closed uint32
timer *timer // Channel的定时器
elemtype *_type
sendx uint // Channel 的已发送元素在缓冲区循环数组中的索引
recvx uint // Channel 的已接收元素在缓冲区循环数组中的索引
recvq waitq // 等待接收的 Goroutine 列表
sendq waitq // 等待发送的 Goroutine 列表
bubble *synctestBubble

/*
lock 保护 hchan 中的所有字段,以及在该通道上被阻塞的多个 sudog 字段。
在持有此锁的情况下,切勿更改其他 G 的状态(特别是不要将一个 G 设置为就绪状态),因为这可能导致与堆栈收缩相关的死锁
*/
lock mutex // 用于保证每个读/写Channel的操作都是原子的
}

sendqrecvq 存储了当前 Channel 由于缓冲区空间不足而阻塞的 Goroutine 列表,这些等待队列使用双向链表 runtime.waitq 表示,链表中所有的元素都是 runtime.sudog 结构(表示一个在等待列表中的 Goroutine):

1
2
3
4
type waitq struct {
first *sudog
last *sudog
}

创建Channel

Channel 是一个引用类型,所以在被初始化之前,它的值为nil;所有 Channel 的创建都会使用 make 关键字进行初始化。例如:make(chan int, 10)。传递的int值代表Channel缓冲区的容量,构造的是一个缓冲Channel;不传int值或者传0的,构造的是一个非缓冲Channel。

1
2
3
4
// 无缓冲通道
ch1 := make(chan int)
// 有缓冲通道
ch2 := make(chan int, 10)

经过一系列编译步骤(为啥不写,因为我半天没弄懂),用于运行时创建chan的函数是makechan

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
func makechan(t *chantype, size int) *hchan {
elem := t.Elem
mem, overflow := math.MulUintptr(elem.Size_, uintptr(size))

// 省略一些元素大小、内存对齐、缓冲区大小的检查

// Hchan does not contain pointers interesting for GC when elements stored in buf do not contain pointers.
// buf points into the same allocation, elemtype is persistent.
// SudoG's are referenced from their owning thread so they can't be collected.
// TODO(dvyukov,rlh): Rethink when collector can move allocated objects.
var c *hchan
switch {
case mem == 0:
// 当前Channel不存在缓冲区:只会为 runtime.hchan 分配一段内存空间
c = (*hchan)(mallocgc(hchanSize, nil, true))
c.buf = c.raceaddr()
case !elem.Pointers():
// 如果当前 Channel 中存储的类型不是指针类型:会为当前的 Channel 和底层的数组分配一块连续的内存空间;
c = (*hchan)(mallocgc(hchanSize+mem, nil, true))
c.buf = add(unsafe.Pointer(c), hchanSize)
default:
// 默认情况:单独为 runtime.hchan 和缓冲区分配内存
c = new(hchan)
c.buf = mallocgc(mem, elem, true)
}
// 更新字段
c.elemsize = uint16(elem.Size_)
c.elemtype = elem
c.dataqsiz = uint(size)
......
return c
}

发送数据

当需要向Channel发送数据时,使用ch <- i语句,实际上调用runtime.chansend。如果在调用时将 block 参数设置成 true,那么表示当前发送操作是阻塞的:

1
2
3
4
5
6
7
8
9
10
func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
// 在发送数据前为当前Channel加锁,防止多个线程并发修改数据
lock(&c.lock)
// 检查Channel是否已关闭
if c.closed != 0 {
unlock(&c.lock)
panic(plainError("send on closed channel"))
}
......
}

分为3个部分:(针对接收方状态)

  1. 当接收队列存在等待的Goroutine 时:通过 runtime.send 直接将数据发送给阻塞的接收者;
  2. 第1条不满足,且缓冲区存在剩余空间时:将发送的数据写入 Channel 的缓冲区;
  3. 第1,2条不满足,且不存在缓冲区或者缓冲区已满时:等待其他 Goroutine 从 Channel 接收数据。

直接发送

如果目标Channel没有被关闭,且已经有处于读等待的 Goroutine:从接收队列 recvq 中取出最先陷入等待的 Goroutine ,并直接向它发送数据:

1
2
3
4
5
6
7
8
9
10
func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
......
if sg := c.recvq.dequeue(); sg != nil {
// Found a waiting receiver. We pass the value we want to send
// directly to the receiver, bypassing the channel buffer (if any).
send(c, sg, ep, func() { unlock(&c.lock) }, 3)
return true
}
......
}

发送数据时调用 runtime.send,分为2个部分:

  1. 调用 runtime.sendDirect 将发送的数据直接拷贝到 x = <-c 表达式中变量 x 所在的内存地址上;
  2. 调用 runtime.goready 将等待接收数据的 Goroutine 标记成可运行状态 Grunnable, 并把该 Goroutine 放到发送方所在的处理器的 runnext 上等待执行,该处理器在下一次调度时会立刻唤醒数据的接收方
    • 注:发送数据的过程只是将接收方的 Goroutine 放到了处理器的 runnext 中,程序没有立刻执行该 Goroutine。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
func send(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) {
/* sg 是接收方的协程;ep 是发送方传递的数据指针;*/
if sg.elem != nil {
sendDirect(c.elemtype, sg, ep)
sg.elem = nil
}
gp := sg.g // 获取与 sg(接收方协程)关联的 Goroutine
unlockf() // 解锁通道
gp.param = unsafe.Pointer(sg) // 将接收方的 sg 传递给该 Goroutine
sg.success = true // 标记发送操作成功
if sg.releasetime != 0 {
sg.releasetime = cputicks()
}
goready(gp, skip+1) // 将接收方协程 gp 设置为就绪状态,准备继续执行
}

缓冲区

如果创建的 Channel 包含缓冲区,且 Channel 中的数据没有装满:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
......
if c.qcount < c.dataqsiz {
// 计算缓冲区中下一个可以存储数据的地址 qp
qp := chanbuf(c, c.sendx)
// 将待发送的数据 ep 拷贝到缓冲区地址 qp;并增加 sendx 索引和 qcount 计数器
typedmemmove(c.elemtype, qp, ep)
c.sendx++
if c.sendx == c.dataqsiz {
c.sendx = 0 // buf 是一个循环数组;所以当 sendx 等于 dataqsiz 时:会重新回到数组开始的位置
}
c.qcount++
unlock(&c.lock)
return true
}
......
}

下图展示相应流程:当前sendx=6,recvx=4:将待发送的数据拷贝到sendx=6的位置;移动sendx到下一个位置。

阻塞发送

当 Channel 没有接收者能够处理数据时,向 Channel 发送数据会被下游阻塞。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
......
if !block {
unlock(&c.lock)
return false
}

// 1. 获取发送数据使用的 Goroutine
gp := getg()
// 2. 获取 runtime.sudog 结构并设置这一次阻塞发送的相关信息:例如发送的 Channel、是否在 select 中和待发送数据的内存地址等
mysg := acquireSudog()
// 3. 将刚刚创建并初始化的 runtime.sudog 加入发送等待队列 sendq,并设置到当前 Goroutine 的 waiting 上,表示 Goroutine 正在等待该 sudog 准备就绪
mysg.elem = ep
mysg.waitlink = nil
mysg.g = gp
mysg.isSelect = false
mysg.c = c
gp.waiting = mysg
gp.param = nil
c.sendq.enqueue(mysg)
......
// 使得当前的发送者 Goroutine 陷入沉睡,等待唤醒,让出处理器的使用权
gopark(chanparkcommit, unsafe.Pointer(&c.lock), reason, traceBlockChanSend, 2)
KeepAlive(ep)
if mysg != gp.waiting {
throw("G waiting list is corrupted")
}
gp.waiting = nil
gp.activeStackChans = false
closed := !mysg.success
gp.param = nil
......
releaseSudog(mysg)
......
return true
}

小结一下:向Channel发送数据的3种情况:

  1. 如果当前 Channel 的 recvq 上存在已经被阻塞的 Goroutine,那么会直接将数据发送给当前 Goroutine 并将其设置成下一个运行的 Goroutine;
  2. 如果 Channel 存在缓冲区并且其中还有空闲的容量,我们会直接将数据存储到缓冲区 sendx 所在的位置上;
  3. 如果不满足上面的两种情况,会创建一个 runtime.sudog 结构并将其加入 Channel 的 sendq 队列中,当前 Goroutine 也会陷入阻塞等待其他的协程从 Channel 接收数据;

触发Goroutine调度的时机:

  1. 发送数据时发现 Channel 上存在等待接收数据的 Goroutine,立刻设置处理器的 runnext 属性,但是并不会立刻触发调度;
  2. 发送数据时并没有找到接收方并且缓冲区已经满了,这时会将自己加入 Channel 的 sendq 队列并调用 runtime.goparkunlock 触发 Goroutine 的调度让出处理器的使用权;

接收数据

Go 语言中可以使用两种不同的方式去接收 Channel 中的数据:

1
2
i <- ch
i, ok <- ch

最终都会调用runtime.chanrecv

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
// 1. 如果从一个空 Channel 接收数据,会直接调用 runtime.gopark 让出处理器的使用权
if c == nil {
if !block {
return
}
gopark(nil, nil, waitReasonChanReceiveNilChan, traceEvGoStop, 2)
throw("unreachable")
}

lock(&c.lock)
// 2. 如果当前 Channel 已被关闭,且缓冲区不存在任何数据:清除ep指针中的数据,返回(true, false)
if c.closed != 0 && c.qcount == 0 {
unlock(&c.lock)
if ep != nil {
typedmemclr(c.elemtype, ep)
}
return true, false
}

以上两种为特殊情况,除此之外,还有3种情形:(针对发送方状态)

  1. 当发送队列sendq存在等待的Goroutine 时:通过 runtime.recv 直接从阻塞的发送者或者缓冲区中获取数据;
  2. 第1条不满足,且缓冲区中存在数据时:从 Channel 的缓冲区中接收数据;
  3. 第1,2条不满足,且缓冲区中不存在数据时:等待其他 Goroutine 向 Channel 发送数据。

直接接收

当Channel的sendq队列中包含等待的Goroutine 时:取出队头等待的Goroutine(和直接发送的处理逻辑较像)。

1
2
3
4
5
6
7
8
if sg := c.sendq.dequeue(); sg != nil {
// Found a waiting sender. If buffer is size 0, receive value
// directly from sender. Otherwise, receive from head of queue
// and add sender's value to the tail of the queue (both map to
// the same buffer slot because the queue is full).
recv(c, sg, ep, func() { unlock(&c.lock) }, 3)
return true, true
}

调用 runtime.recv ,分为两个部分:

  1. 将发送者的数据sg放入Channel,发送者被唤醒以继续执行;
  2. 被接收者(当前Goroutine)接收到的数据,被写入ep
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
func recv(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) {
......
// 1.1 无缓冲的Channel(同步通道):
if c.dataqsiz == 0 {
if raceenabled {
racesync(c, sg)
}
if ep != nil {
// 将 Channel 发送队列中 Goroutine 存储的 elem 数据拷贝到目标内存地址中
recvDirect(c.elemtype, sg, ep)
}
} else { // 1.2 有缓冲的Channel(异步通道):
// 从接收队列头部,取出一个元素 qp
qp := chanbuf(c, c.recvx)
// 接收端:qp 拷贝到接收方的内存地址 ep
if ep != nil {
typedmemmove(c.elemtype, ep, qp)
}
// 发送端:将发送者的数据从 sg.elem 拷贝到队列中当前位置 qp
typedmemmove(c.elemtype, qp, sg.elem)
// 更新接收指针
c.recvx++
if c.recvx == c.dataqsiz {
c.recvx = 0
}
c.sendx = c.recvx // c.sendx = (c.sendx+1) % c.dataqsiz
}
// 2. 完成接收操作:
sg.elem = nil
gp := sg.g
unlockf() // 解锁通道
gp.param = unsafe.Pointer(sg)
sg.success = true
if sg.releasetime != 0 {
sg.releasetime = cputicks()
}
goready(gp, skip+1)
}

下图展示相应流程。缓冲区已满,发送队列中存在等待的Gouroutine时:使用发送队列头 runtime.sudog 中的元素替换接收索引 recvx 所在位置的元素;recvx原有元素被拷贝到接收数据的变量对应的内存空间上。

缓冲区

当Channel的缓冲区中已经包含数据时:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
if c.qcount > 0 {
// 获取recvx位置
qp := chanbuf(c, c.recvx)
if ep != nil {
// 将缓冲区地址 qp 中的数据拷贝到接收端内存地址 ep;并增加 recv 索引,减少 qcount 计数器
typedmemmove(c.elemtype, ep, qp)
}
typedmemclr(c.elemtype, qp)
c.recvx++
if c.recvx == c.dataqsiz {
c.recvx = 0
}
c.qcount--
unlock(&c.lock) // 释放Channel持有的锁
return true, true
}
}

下图展示相应流程:当前sendx=4,recvx=6:将待接收的数据从recvx=6的位置拷贝到内存地址;移动recvx到下一个位置。

阻塞接收

当 Channel 没有发送者能够处理数据时,从 Channel 接收数据会被阻塞。(然而不是所有的接收操作都是阻塞的,与 select 语句结合使用时就可能会使用到非阻塞的接收操作)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
// selected:是否选中;received:是否成功接收到数据
...
// 1. 非阻塞模式:直接解锁Channel的锁
if !block {
unlock(&c.lock)
return false, false
}
// 2. 阻塞模式:需要阻塞以等待数据
gp := getg() // 获取当前goroutine
mysg := acquireSudog() // 创建一个新的sudog(代表一个等待的goroutine)
mysg.elem = ep
gp.waiting = mysg
mysg.g = gp
mysg.c = c
// 2.1 将 mysg(当前正在等待接收的 goroutine)加入到通道 c 的接收队列中
c.recvq.enqueue(mysg)
// 2.2 让出处理器的使用权并等待调度器的调度
goparkunlock(&c.lock, waitReasonChanReceive, traceEvGoBlockRecv, 3)
// 2.3 恢复并释放资源
gp.waiting = nil
closed := gp.param == nil
gp.param = nil
releaseSudog(mysg)
return true, !closed
}

小结一下:从Channel中接收数据的5种情况:

  1. 如果 Channel 为空,那么会直接调用 runtime.gopark 挂起当前 Goroutine;
  2. 如果 Channel 已经关闭并且缓冲区没有任何数据,runtime.chanrecv 会直接返回;
  3. 如果 Channel 的 sendq 队列中存在挂起的 Goroutine,会将 recvx 索引所在的数据拷贝到接收变量所在的内存空间上并将 sendq 队列中 Goroutine 的数据拷贝到缓冲区;
  4. 如果 Channel 的缓冲区中包含数据,那么直接读取 recvx 索引对应的数据;
  5. 在默认情况下会挂起当前的 Goroutine,将 runtime.sudog 结构加入 recvq 队列并陷入休眠等待调度器的唤醒;

触发Goroutine调度的两个时机:

  1. 当 Channel 为空时;
  2. 当缓冲区中不存在数据并且也不存在数据的发送者时。

关闭Channel

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
func closechan(c *hchan) {
// 1. 处理Channel是空指针/已被关闭的情况:直接崩溃,抛出异常
if c == nil {
panic(plainError("close of nil channel"))
}
lock(&c.lock)
if c.closed != 0 {
unlock(&c.lock)
panic(plainError("close of closed channel"))
}
c.closed = 1

// 2. 将 recvq 和 sendq 两个队列中的数据,加入到Goroutine列表gList中,同时清除所有 runtime.sudog 上未被处理的元素:
var glist gList
// release all readers
for {
sg := c.recvq.dequeue()
if sg == nil {
break
}
if sg.elem != nil {
typedmemclr(c.elemtype, sg.elem)
sg.elem = nil
}
if sg.releasetime != 0 {
sg.releasetime = cputicks()
}
gp := sg.g
gp.param = unsafe.Pointer(sg)
sg.success = false
glist.push(gp)
}

// release all writers (they will panic)
for {
sg := c.sendq.dequeue()
if sg == nil {
break
}
sg.elem = nil
if sg.releasetime != 0 {
sg.releasetime = cputicks()
}
gp := sg.g
gp.param = unsafe.Pointer(sg)
sg.success = false
glist.push(gp)
}
unlock(&c.lock)

// 为所有被阻塞的 Goroutine 调用 runtime.goready 触发调度
for !glist.empty() {
gp := glist.pop()
gp.schedlink = 0
goready(gp, 3)
}
}