Go-channel
Go-Channel
Channel 是 Go 的核心数据结构和 Goroutine 之间的通信方式,支持 Go 的高性能并发编程模型。
设计原理
不通过共享内存的方式通信,而是通过通信的方式共享内存,采用的并发模式为:CSP(通信顺序进程)。Goroutine 和 Channel 分别对应 CSP 中的实体和传递信息的媒介,Goroutine 之间会通过 Channel 传递数据。
怎么理解“不通过共享内存的方式通信,而是通过通信的方式共享内存”这句话呢?
前半句指通过 sync 包里的一些组件进行并发编程;后半句指使用 channel 进行并发编程。实际上,channel 的底层就是通过 mutex 来控制并发的。只是 channel 是更高一层次的并发编程原语,封装了更多的功能。
上图中的两个 Goroutine,一个会向 Channel 中发送数据,另一个会从 Channel 中接收数据,它们两者能够独立运行并不存在直接关联,但是能通过 Channel 间接完成通信。
使用通信来共享内存是 Go 的设计哲学,那么为什么是这样的呢?
- 使用发送/接受消息来同步信息,相较于直接使用共享内存和互斥锁,是一种更高级的抽象,提供更好的封装;
- 便于将线程的职责分为生产者和消费者,通过消息传递的方式解耦;
- 消息发送的方式,保证同一时间只有一个活跃的线程能访问数据,从设计上天然地避免线程竞争和数据冲突。
FIFO
Channel的收发操作遵循“先进先出”:
- 先从 Channel 读取数据的 Goroutine 会先接收到数据;
- 先向 Channel 发送数据的 Goroutine 会得到先发送数据的权利。
无锁管道
锁作为一种并发控制技术,通常分为:乐观锁和悲观锁。无锁队列对应使用乐观锁的队列。
- 乐观锁:本质是基于验证的协议,使用原子指令CAS在多线程之间同步数据。无锁队列的实现也依赖这一原子指令。
Channel在运行时的内部表示是runtime.hchan,包含互斥锁,因此是一个用于同步和通信的有锁队列。
然而,锁导致的休眠和唤醒将带来额外的上下文切换;如果临界区过大,加锁解锁导致的额外开销,将成为性能瓶颈。Go社区曾提出了无锁Channel实现方案,分为以下3种类型:
- 同步Channel(不带缓冲):,发送方直接将数据交给Handoff接收方;
- 同步模式下,当且仅当发送方和接收方都ready的情况下,数据才能传输(本质上是内存拷贝);否则,任意一方先行进行发送或接收操作,都会被挂起,等待另一方的出现才能被唤醒。
- 异步Channel(带缓冲):基于环形缓存的传统生产者消费者模型;
- 在缓冲槽有剩余容量的情况下,发送和接收操作都可以顺利进行。否则,操作的一方(如写入)同样会被挂起,直到出现相反操作(如接收)才会被唤醒。
chan struct{}类型的异步Channel:struct{}类型不占用内存空间,无需实现缓冲区和直接发送Handoff的语义。
这个提案的目的也不是实现完全无锁的队列,只是在一些关键路径上通过无锁提升 Channel 的性能。
数据结构
1 | type hchan struct { |
sendq 和 recvq 存储了当前 Channel 由于缓冲区空间不足而阻塞的 Goroutine 列表,这些等待队列使用双向链表 runtime.waitq 表示,链表中所有的元素都是 runtime.sudog 结构(表示一个在等待列表中的 Goroutine):
1 | type waitq struct { |
创建Channel
Channel 是一个引用类型,所以在被初始化之前,它的值为nil;所有 Channel 的创建都会使用 make 关键字进行初始化。例如:make(chan int, 10)。传递的int值代表Channel缓冲区的容量,构造的是一个缓冲Channel;不传int值或者传0的,构造的是一个非缓冲Channel。
1 | // 无缓冲通道 |
经过一系列编译步骤(为啥不写,因为我半天没弄懂),用于运行时创建chan的函数是makechan:
1 | func makechan(t *chantype, size int) *hchan { |
发送数据
当需要向Channel发送数据时,使用ch <- i语句,实际上调用runtime.chansend。如果在调用时将 block 参数设置成 true,那么表示当前发送操作是阻塞的:
1 | func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool { |
分为3个部分:(针对接收方状态)
- 当接收队列存在等待的Goroutine 时:通过
runtime.send直接将数据发送给阻塞的接收者; - 第1条不满足,且缓冲区存在剩余空间时:将发送的数据写入 Channel 的缓冲区;
- 第1,2条不满足,且不存在缓冲区或者缓冲区已满时:等待其他 Goroutine 从 Channel 接收数据。
直接发送
如果目标Channel没有被关闭,且已经有处于读等待的 Goroutine:从接收队列 recvq 中取出最先陷入等待的 Goroutine ,并直接向它发送数据:
1 | func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool { |
发送数据时调用 runtime.send,分为2个部分:
- 调用
runtime.sendDirect将发送的数据直接拷贝到x = <-c表达式中变量x所在的内存地址上; - 调用
runtime.goready将等待接收数据的 Goroutine 标记成可运行状态Grunnable, 并把该 Goroutine 放到发送方所在的处理器的runnext上等待执行,该处理器在下一次调度时会立刻唤醒数据的接收方;- 注:发送数据的过程只是将接收方的 Goroutine 放到了处理器的
runnext中,程序没有立刻执行该 Goroutine。
- 注:发送数据的过程只是将接收方的 Goroutine 放到了处理器的
1 | func send(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) { |
缓冲区
如果创建的 Channel 包含缓冲区,且 Channel 中的数据没有装满:
1 | func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool { |
下图展示相应流程:当前sendx=6,recvx=4:将待发送的数据拷贝到sendx=6的位置;移动sendx到下一个位置。
阻塞发送
当 Channel 没有接收者能够处理数据时,向 Channel 发送数据会被下游阻塞。
1 | func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool { |
小结一下:向Channel发送数据的3种情况:
- 如果当前 Channel 的
recvq上存在已经被阻塞的 Goroutine,那么会直接将数据发送给当前 Goroutine 并将其设置成下一个运行的 Goroutine;- 如果 Channel 存在缓冲区并且其中还有空闲的容量,我们会直接将数据存储到缓冲区
sendx所在的位置上;- 如果不满足上面的两种情况,会创建一个
runtime.sudog结构并将其加入 Channel 的sendq队列中,当前 Goroutine 也会陷入阻塞等待其他的协程从 Channel 接收数据;触发Goroutine调度的时机:
- 发送数据时发现 Channel 上存在等待接收数据的 Goroutine,立刻设置处理器的
runnext属性,但是并不会立刻触发调度;- 发送数据时并没有找到接收方并且缓冲区已经满了,这时会将自己加入 Channel 的
sendq队列并调用runtime.goparkunlock触发 Goroutine 的调度让出处理器的使用权;
接收数据
Go 语言中可以使用两种不同的方式去接收 Channel 中的数据:
1 | i <- ch |
最终都会调用runtime.chanrecv。
1 | func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) { |
以上两种为特殊情况,除此之外,还有3种情形:(针对发送方状态)
- 当发送队列
sendq存在等待的Goroutine 时:通过runtime.recv直接从阻塞的发送者或者缓冲区中获取数据; - 第1条不满足,且缓冲区中存在数据时:从 Channel 的缓冲区中接收数据;
- 第1,2条不满足,且缓冲区中不存在数据时:等待其他 Goroutine 向 Channel 发送数据。
直接接收
当Channel的sendq队列中包含等待的Goroutine 时:取出队头等待的Goroutine(和直接发送的处理逻辑较像)。
1 | if sg := c.sendq.dequeue(); sg != nil { |
调用 runtime.recv ,分为两个部分:
- 将发送者的数据
sg放入Channel,发送者被唤醒以继续执行; - 被接收者(当前Goroutine)接收到的数据,被写入
ep。
1 | func recv(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) { |
下图展示相应流程。缓冲区已满,发送队列中存在等待的Gouroutine时:使用发送队列头 runtime.sudog 中的元素替换接收索引 recvx 所在位置的元素;recvx原有元素被拷贝到接收数据的变量对应的内存空间上。
缓冲区
当Channel的缓冲区中已经包含数据时:
1 | func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) { |
下图展示相应流程:当前sendx=4,recvx=6:将待接收的数据从recvx=6的位置拷贝到内存地址;移动recvx到下一个位置。
阻塞接收
当 Channel 没有发送者能够处理数据时,从 Channel 接收数据会被阻塞。(然而不是所有的接收操作都是阻塞的,与 select 语句结合使用时就可能会使用到非阻塞的接收操作)
1 | func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) { |
小结一下:从Channel中接收数据的5种情况:
- 如果 Channel 为空,那么会直接调用
runtime.gopark挂起当前 Goroutine;- 如果 Channel 已经关闭并且缓冲区没有任何数据,
runtime.chanrecv会直接返回;- 如果 Channel 的
sendq队列中存在挂起的 Goroutine,会将recvx索引所在的数据拷贝到接收变量所在的内存空间上并将sendq队列中 Goroutine 的数据拷贝到缓冲区;- 如果 Channel 的缓冲区中包含数据,那么直接读取
recvx索引对应的数据;- 在默认情况下会挂起当前的 Goroutine,将
runtime.sudog结构加入recvq队列并陷入休眠等待调度器的唤醒;触发Goroutine调度的两个时机:
- 当 Channel 为空时;
- 当缓冲区中不存在数据并且也不存在数据的发送者时。
关闭Channel
1 | func closechan(c *hchan) { |






