Go-channel
Go-Channel
Channel是Go的核心数据结构和Goroutine之间的通信方式,支持Go的高性能并发编程模型。
设计原理
不通过共享内存的方式通信,而是通过通信的方式共享内存,采用的并发模式为:CSP(通信顺序进程)。Goroutine 和 Channel 分别对应 CSP 中的实体和传递信息的媒介,Goroutine 之间会通过 Channel 传递数据。
怎么理解“不通过共享内存的方式通信,而是通过通信的方式共享内存”这句话呢?
前半句指通过 sync 包里的一些组件进行并发编程;后半句指使用 channel 进行并发编程。实际上,channel 的底层就是通过 mutex 来控制并发的。只是 channel 是更高一层次的并发编程原语,封装了更多的功能。
上图中的两个 Goroutine,一个会向 Channel 中发送数据,另一个会从 Channel 中接收数据,它们两者能够独立运行并不存在直接关联,但是能通过 Channel 间接完成通信。
FIFO
Channel的收发操作遵循“先进先出”:
- 先从 Channel 读取数据的 Goroutine 会先接收到数据;
- 先向 Channel 发送数据的 Goroutine 会得到先发送数据的权利。
无锁管道
锁作为一种并发控制技术,通常分为:乐观锁和悲观锁。无锁队列对应使用乐观锁的队列。
- 乐观锁:本质是基于验证的协议,使用原子指令CAS在多线程之间同步数据。无锁队列的实现也依赖这一原子指令。
Channel在运行时的内部表示是runtime.hchan
,包含互斥锁,因此是一个用于同步和通信的有锁队列。
然而,锁导致的休眠和唤醒将带来额外的上下文切换;如果临界区过大,加锁解锁导致的额外开销,将成为性能瓶颈。Go社区曾提出了无锁Channel实现方案,分为以下3种类型:
- 同步Channel(不带缓冲):,发送方直接将数据交给Handoff接收方;
- 同步模式下,当且仅当发送方和接收方都ready的情况下,数据才能传输(本质上是内存拷贝);否则,任意一方先行进行发送或接收操作,都会被挂起,等待另一方的出现才能被唤醒。
- 异步Channel(带缓冲):基于环形缓存的传统生产者消费者模型;
- 在缓冲槽有剩余容量的情况下,发送和接收操作都可以顺利进行。否则,操作的一方(如写入)同样会被挂起,直到出现相反操作(如接收)才会被唤醒。
chan struct{}
类型的异步Channel:struct{}
类型不占用内存空间,无需实现缓冲区和直接发送Handoff的语义。
这个提案的目的也不是实现完全无锁的队列,只是在一些关键路径上通过无锁提升 Channel 的性能。
数据结构
1 | type hchan struct { |
sendq
和 recvq
存储了当前 Channel 由于缓冲区空间不足而阻塞的 Goroutine 列表,这些等待队列使用双向链表 runtime.waitq
表示,链表中所有的元素都是 runtime.sudog
结构(表示一个在等待列表中的 Goroutine):
1 | type waitq struct { |
创建Channel
Channel 是一个引用类型,所以在被初始化之前,它的值为nil
;所有 Channel 的创建都会使用 make
关键字进行初始化。例如:make(chan int, 10)
。传递的int值代表Channel缓冲区的容量,构造的是一个缓冲Channel;不传int值或者传0的,构造的是一个非缓冲Channel。
1 | // 无缓冲通道 |
经过一系列编译步骤(为啥不写,因为我半天没弄懂),用于运行时创建chan
的函数是makechan
:
1 | func makechan(t *chantype, size int) *hchan { |
发送数据
当需要向Channel发送数据时,使用ch <- i
语句,实际上调用runtime.chansend
。如果在调用时将 block
参数设置成 true
,那么表示当前发送操作是阻塞的:
1 | func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool { |
分为3个部分:(针对接收方状态)
- 当接收队列存在等待的Goroutine 时:通过
runtime.send
直接将数据发送给阻塞的接收者; - 第1条不满足,且缓冲区存在剩余空间时:将发送的数据写入 Channel 的缓冲区;
- 第1,2条不满足,且不存在缓冲区或者缓冲区已满时:等待其他 Goroutine 从 Channel 接收数据。
直接发送
如果目标Channel没有被关闭,且已经有处于读等待的 Goroutine:从接收队列 recvq
中取出最先陷入等待的 Goroutine ,并直接向它发送数据:
1 | func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool { |
发送数据时调用 runtime.send
,分为2个部分:
- 调用
runtime.sendDirect
将发送的数据直接拷贝到x = <-c
表达式中变量x
所在的内存地址上; - 调用
runtime.goready
将等待接收数据的 Goroutine 标记成可运行状态Grunnable
, 并把该 Goroutine 放到发送方所在的处理器的runnext
上等待执行,该处理器在下一次调度时会立刻唤醒数据的接收方;- 注:发送数据的过程只是将接收方的 Goroutine 放到了处理器的
runnext
中,程序没有立刻执行该 Goroutine。
- 注:发送数据的过程只是将接收方的 Goroutine 放到了处理器的
1 | func send(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) { |
缓冲区
如果创建的 Channel 包含缓冲区,且 Channel 中的数据没有装满:
1 | func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool { |
下图展示相应流程:当前sendx
=6,recvx
=4:将待发送的数据拷贝到sendx
=6的位置;移动sendx
到下一个位置。
阻塞发送
当 Channel 没有接收者能够处理数据时,向 Channel 发送数据会被下游阻塞。
1 | func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool { |
小结一下:向Channel发送数据的3种情况:
- 如果当前 Channel 的
recvq
上存在已经被阻塞的 Goroutine,那么会直接将数据发送给当前 Goroutine 并将其设置成下一个运行的 Goroutine;- 如果 Channel 存在缓冲区并且其中还有空闲的容量,我们会直接将数据存储到缓冲区
sendx
所在的位置上;- 如果不满足上面的两种情况,会创建一个
runtime.sudog
结构并将其加入 Channel 的sendq
队列中,当前 Goroutine 也会陷入阻塞等待其他的协程从 Channel 接收数据;触发Goroutine调度的时机:
- 发送数据时发现 Channel 上存在等待接收数据的 Goroutine,立刻设置处理器的
runnext
属性,但是并不会立刻触发调度;- 发送数据时并没有找到接收方并且缓冲区已经满了,这时会将自己加入 Channel 的
sendq
队列并调用runtime.goparkunlock
触发 Goroutine 的调度让出处理器的使用权;
接收数据
Go 语言中可以使用两种不同的方式去接收 Channel 中的数据:
1 | i <- ch |
最终都会调用runtime.chanrecv
。
1 | func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) { |
以上两种为特殊情况,除此之外,还有3种情形:(针对发送方状态)
- 当发送队列
sendq
存在等待的Goroutine 时:通过runtime.recv
直接从阻塞的发送者或者缓冲区中获取数据; - 第1条不满足,且缓冲区中存在数据时:从 Channel 的缓冲区中接收数据;
- 第1,2条不满足,且缓冲区中不存在数据时:等待其他 Goroutine 向 Channel 发送数据。
直接接收
当Channel的sendq
队列中包含等待的Goroutine 时:取出队头等待的Goroutine(和直接发送的处理逻辑较像)。
1 | if sg := c.sendq.dequeue(); sg != nil { |
调用 runtime.recv
,分为两个部分:
- 将发送者的数据
sg
放入Channel,发送者被唤醒以继续执行; - 被接收者(当前Goroutine)接收到的数据,被写入
ep
。
1 | func recv(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) { |
下图展示相应流程。缓冲区已满,发送队列中存在等待的Gouroutine时:使用发送队列头 runtime.sudog
中的元素替换接收索引 recvx
所在位置的元素;recvx
原有元素被拷贝到接收数据的变量对应的内存空间上。
缓冲区
当Channel的缓冲区中已经包含数据时:
1 | func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) { |
下图展示相应流程:当前sendx
=4,recvx
=6:将待接收的数据从recvx
=6的位置拷贝到内存地址;移动recvx
到下一个位置。
阻塞接收
当 Channel 没有发送者能够处理数据时,从 Channel 接收数据会被阻塞。(然而不是所有的接收操作都是阻塞的,与 select
语句结合使用时就可能会使用到非阻塞的接收操作)
1 | func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) { |
小结一下:从Channel中接收数据的5种情况:
- 如果 Channel 为空,那么会直接调用
runtime.gopark
挂起当前 Goroutine;- 如果 Channel 已经关闭并且缓冲区没有任何数据,
runtime.chanrecv
会直接返回;- 如果 Channel 的
sendq
队列中存在挂起的 Goroutine,会将recvx
索引所在的数据拷贝到接收变量所在的内存空间上并将sendq
队列中 Goroutine 的数据拷贝到缓冲区;- 如果 Channel 的缓冲区中包含数据,那么直接读取
recvx
索引对应的数据;- 在默认情况下会挂起当前的 Goroutine,将
runtime.sudog
结构加入recvq
队列并陷入休眠等待调度器的唤醒;触发Goroutine调度的两个时机:
- 当 Channel 为空时;
- 当缓冲区中不存在数据并且也不存在数据的发送者时。
关闭Channel
1 | func closechan(c *hchan) { |