Go-调度器

多个线程可以属于同一个进程并共享内存空间。因此它们也不需要内存管理单元处理上下文的切换,线程之间的通信是基于共享的内存进行的

虽然线程比较轻量,但是在调度时也有比较大的额外开销。每个线程会都占用 1M 以上的内存空间,在切换线程时不止会消耗较多的内存,恢复寄存器中的内容还需要向操作系统申请或者销毁资源,每一次线程上下文的切换都需要消耗 ~1us 左右的时间1,但是 Go 调度器对 Goroutine 的上下文切换约为 ~0.2us,减少了 80% 的额外开销。

Go 语言的调度器通过使用与 CPU 数量相等的线程减少线程频繁切换的内存开销,同时在每一个线程上执行额外开销更低的 Goroutine 来降低操作系统和硬件的负载。

设计原理

包括以下几个版本:

  1. 单线程调度器:程序中只能存在一个活跃线程,由G-M模型组成;

单线程调度器

0.x 版本调度器只包含:Goroutine(G)和 线程(M)两种结构,全局只有一个线程。如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
static void
scheduler(void)
{
G* gp;
m->procid = getprocid(); // 保存当前进程的ID
lock(&sched);

// 1. 保存当前机器上下文,并尝试恢复(即切换到另一个goroutine)
if(gosave(&m->sched)){
// 如果函数返回 true,表示该函数是通过 gogo 或者 gosave 跳转到当前执行点的:之前的 lock(&sched) 未能执行,需要在此重新锁住调度器
lock(&sched);

gp = m->curg; // 将当前执行的 goroutine 保存到 gp
gp->m = nil;

// 2. 状态切换
switch(gp->status){ // 检查当前 goroutine 的状态
case Grunnable:
case Gdead:
throw("bad gp->status in sched");
case Grunning: // 正在运行:放回就绪队列
gp->status = Grunnable;
gput(gp);
break;
case Gmoribund: // 即将死亡
gp->status = Gdead;
if(--sched.gcount == 0)
sys·exit(0);
break;
}
notewakeup(&gp->stopped); // 通知 gp 已经停止,标记当前goroutine已经完成,准备返回调度器
}

// 3. 获取下一个准备运行的goroutine:如果没有goroutine可以执行,它会使当前线程阻塞等待
gp = nextgandunlock();

noteclear(&gp->stopped); // 清除 gp 上的停止标记:表示当前的goroutine已经准备好运行
gp->status = Grunning;
m->curg = gp; // 更新 m->curg 为当前要运行的goroutine gp
gp->m = m; // for debugger
g = gp; // 全局 g 设置为当前正在运行的 goroutine
gogo(&gp->sched); // 恢复goroutine的执行状态,继续运行
}

包括以下步骤:

  1. 保存和恢复上下文lock(&sched)获取调度器的全局锁,调用 runtime.gosave:9682400 保存栈寄存器和程序计数器;
  2. 选择下一个要运行的goroutine:调用 runtime.nextgandunlock:9682400 获取下一个需要运行的 Goroutine 并解锁调度器;修改全局线程 m 上要执行的 Goroutine;调用 runtime.gogo:9682400 函数运行最新的 Goroutine;

多线程调度器

整体的逻辑与单线程调度器差异不大。因为程序中可能同时存在多个活跃线程,所以多线程调度器引入 GOMAXPROCS 变量帮助灵活控制程序中的最大处理器数,即活跃线程数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
static void schedule(G *gp) {
schedlock();
if(gp != nil) {
gp->m = nil;
uint32 v = runtime·xadd(&runtime·sched.atomic, -1<<mcpuShift);
if(atomic_mcpu(v) > maxgomaxprocs)
runtime·throw("negative mcpu in scheduler");

switch(gp->status){
case Grunning:
gp->status = Grunnable;
gput(gp);
break;
case ...:
}
} else {
...
}
gp = nextgandunlock();
gp->status = Grunning;
m->curg = gp;
gp->m = m;
runtime·gogo(&gp->sched, 0);
}

多线程调度器的问题:锁竞争严重浪费资源。有以下问题待解决:

  1. 单一全局互斥锁(Sched.Lock)和集中的状态:当前的调度器使用一个全局的互斥锁保护所有与goroutine相关的操作(如创建、完成、重新调度等);
  2. Goroutine的传递(G.nextg:工作线程(M)经常在它们之间传递可运行的goroutine,这可能导致延迟和额外的开销。每个M必须能够执行任何可运行的G,特别是刚刚创建的G;
  3. 每个M的内存缓存(M.mcache:内存缓存和其他缓存(如栈分配)与所有M关联,但实际上这些缓存只需要与正在运行Go代码的M关联(处于系统调用中的M不需要缓存)。在一些情况下,运行Go代码的M与所有M的比例可能高达1:100,这会导致过度的资源消耗(每个MCache可能会占用多达2MB的内存)和较差的数据局部性;
  4. 频繁的线程阻塞和解除阻塞:在系统调用的情况下,工作线程会频繁地被阻塞和解除阻塞,这增加了额外的开销。

任务窃取调度器

Scalable Go Scheduler Design Doc 对于多线程调度器的若干问题,引入处理器P,在此基础上实现任务窃取调度器。

  • M表示操作系统线程(与现有实现相同);
  • P表示执行Go代码所需的资源:当M执行Go代码时,它需要一个与之关联的P;当M处于空闲或系统调用状态时,不需要P。系统中有GOMAXPROCSP,所有P被组织成一个数组(调整 GOMAXPROCS 时,需要暂停并重新启动程序以调整P的数量)

处理器P

相较于多线程调度器:调度器的一些变量将从sched中去中心化并移动到P中,部分与Go代码执行相关的M的变量也将移到P中。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
struct P {
uint32 status;
P* link;
uint32 tick;
M* m; // 线程
MCache* mcache;

G** runq;
int32 runqhead;
int32 runqtail;
int32 runqsize;

G* gfree;
int32 gfreecnt;
};

P *allp; // [GOMAXPROCS]

P *idlep; // 空闲P的无锁列表

处理器持有一个由就绪的Goroutine组成的环形就绪队列runq,且反向持有一个线程;调度时从处理器P的就绪队列中,选择队头的Goroutine放到线程M上执行。

基于工作窃取的多线程调度器将每一个线程绑定到了独立的 CPU 上(M和P绑定);这些线程会被不同处理器管理,不同的处理器P通过工作窃取对Goroutine进行再分配实现任务的平衡,能提升调度器和 Go 语言程序的整体性能。

调度

  1. 当创建一个新的G或者一个G转为就绪状态时:当前G被推入当前Pgoroutines的就绪队列;
  2. P完成G的执行后,首先尝试从自己的goroutine就绪队列中弹出一个G以继续执行;如果队列为空,P会选择一个随机的受害者(另一个P),并尝试从它那里窃取一半的就绪goroutines。(充分利用处理器资源)

Syscall/M的挂靠和解挂靠

M创建一个新的G时,必须确保有一个M来执行该G(如果所有M都已经忙碌);同样,当M进入系统调用时,它必须确保有其他的M可以继续执行Go代码。有两种选择:立即阻塞和解除阻塞M;或者使用一些自旋(消耗一些CPU周期)。这是性能和不必要的CPU消耗之间的冲突。为了避免影响GOMAXPROCS=1的程序,采取了一些自旋策略。

自旋是分两级的:

  1. P关联的空闲M在等待新G时进行自旋;
  2. P关联的M等待可用的P

这种自旋方式主要是被动的(通过yieldsched_yield()),但可能会包括一些主动的自旋(通过循环消耗CPU),需要进一步研究和优化。

自旋锁:一种轻量级的同步机制,广泛应用于多线程编程和操作系统内核中。它通过忙等待(busy-waiting)的方式,让线程在尝试获取锁时不断循环检查锁的状态,直到成功获取锁为止

看看一个版本的实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
static void
schedule(void)
{
G *gp;
// 1. 检查当前线程m是否持有锁
if(m->locks)
runtime·throw("schedule: holding locks");
top:
// 2. 等待垃圾回收
if(runtime·gcwaiting) {
gcstopm();
goto top;
}

// 3. 从当前处理器上的运行队列获取一个就绪的goroutine
gp = runqget(m->p);
// 4. 没有就绪的goroutine:查找其他P得就绪goroutine
if(gp == nil)
gp = findrunnable();
// 5. 检查当前线程m是否在进行自旋操作:
if(m->spinning) {
// 自旋完成
m->spinning = false;
runtime·xadd(&runtime·sched.nmspinning, -1);
}

// 6. 检查是否需要唤醒其他OS线程(当前P的运行队列存在剩余Goroutine,当前没有自旋线程,存在空闲P):唤醒一个空闲的OS线程
if (m->p->runqhead != m->p->runqtail &&
runtime·sched.nmspinning == 0 &&
runtime·sched.npidle > 0)
wakep();
// 7. 如果要执行的goroutine被锁定:
if(gp->lockedm) {
startlockedm(gp); // 启动一个锁定的M执行该goroutine
goto top;
}
// 8. 执行选定的goroutine
execute(gp);
}

工作流程:

  1. 当一个Goroutine被创建时:它被放入一个P的本地队列;
  2. 若P的本地队列满了,或者某个G长期未被调度执行:P尝试从全局队列获取G
  3. 若全局队列为空:P从其他P的本地队列中,窃取一些G,放在当前M上运行,以保证尽可能多地利⽤所有的处理器。

工作窃取函数:runtime.findrunnable:779c45a

抢占式调度器

解决1.1版本中,程序只能依靠Goroutine主动让出CPU资源才能触发调度的问题。

基于协作的抢占式调度

工作原理:

  1. 编译器会在调用函数前插入 runtime.morestack
  2. Go 语言运行时会在垃圾回收暂停程序、系统监控发现 Goroutine 运行超过 10ms 时发出抢占请求 StackPreempt
  3. 当发生函数调用时,可能会执行编译器插入的 runtime.morestack,它调用的 runtime.newstack 检查 Goroutine 的 stackguard0 字段是否为 StackPreempt
  4. 如果 stackguard0StackPreempt,就会触发抢占让出当前线程;

抢占通过编译器在函数调用时插入抢占检查指令实现:在函数调用时检查当前Goroutine是否发起了抢占请求,因此需要函数调用作为入口才能触发抢占

然而,Goroutine可能因为垃圾回收和循环长时间占用资源导致程序暂停。

基于信号的抢占式调度

  1. 注册信号处理函数:程序启动时,在 runtime.sighandler 中注册 SIGURG 信号的处理函数 runtime.doSigPreempt
  2. 在触发垃圾回收的栈扫描时,会调用 runtime.suspendG 挂起 Goroutine,该函数会执行下面的逻辑:
    1. _Grunning 状态的 Goroutine 标记成可以被抢占,即将 preemptStop 设置成 true
    2. 调用 runtime.preemptM 触发抢占;
  3. 发送信号runtime.preemptM 会调用 runtime.signalM 向线程发送信号 SIGURG
  4. 接收信号后,OS执行信号处理函数:操作系统会中断正在运行的线程,并执行预先注册的信号处理函数 runtime.doSigPreempt
  5. runtime.doSigPreempt 函数会处理抢占信号,获取当前的 SP 和 PC 寄存器并调用 runtime.sigctxt.pushCall`;
  6. runtime.sigctxt.pushCall 会修改寄存器并在程序回到用户态时执行 runtime.asyncPreempt
  7. 汇编指令 runtime.asyncPreempt 会调用运行时函数 runtime.asyncPreempt2
  8. runtime.asyncPreempt2 会调用 runtime.preemptPark
  9. runtime.preemptPark修改当前 Goroutine 的状态到 _Gpreempted,并调用 runtime.schedule 让当前函数陷入休眠并让出线程,调度器会选择其它的 Goroutine 继续执行;

这里抢占的安全点包括:STW(Stop-the-world,垃圾回收时暂停整个程序);栈扫描

数据结构

调度器包括3个重要组成部分:

  1. G:表示Goroutine,一个待执行的任务;
  2. M:表示操作系统线程;
  3. P:表示处理器。

G

Goroutine 是 Go 语言调度器中待执行的任务,只存在于Go语言运行时,是Go语言在用户态提供的线程。使用结构体runtime.g

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
type g struct {
// 1. 栈相关字段
stack stack // 当前 Goroutine 的栈内存范围 [stack.lo, stack.hi)
stackguard0 uintptr // 用于抢占式调度

// 2. 抢占式调度相关字段
preempt bool // 抢占信号
preemptStop bool // 抢占时将状态修改成 `_Gpreempted`
preemptShrink bool // 在同步安全点收缩栈

// 3. 存储 defer 和 panic 对应结构体的链表
_panic *_panic // 最内侧的 panic 结构体
_defer *_defer // 最内侧的延迟函数结构体

// 4. 其他字段
m *m // 当前 Goroutine 占用的线程,可能为空
sched gobuf // 存储 Goroutine 的调度相关的数据
atomicstatus uint32 // Goroutine 的状态
goid int64 // Goroutine 的 ID(该字段对开发者不可见)
}

sched字段的runtime.gobuf结构体

1
2
3
4
5
6
7
type gobuf struct {
sp uintptr // 栈指针
pc uintptr // 程序计数器
g guintptr // 持有`runtime.gobuf` 的 Goroutine(对应sched字段)
ret sys.Uintreg // syscall返回值
...
}

Goroutine的状态(对应atomicstatus字段)

状态描述
_Gidle刚刚被分配并且还没有被初始化
_Grunnable没有执行代码,没有栈的所有权,存储在运行队列中
_Grunning可以执行代码,拥有栈的所有权,被赋予了内核线程 M 和处理器 P
_Gsyscall正在执行系统调用,拥有栈的所有权,没有执行用户代码,被赋予了内核线程 M 但是不在运行队列上
_Gwaiting由于运行时而被阻塞,没有执行用户代码并且不在运行队列上,但是可能存在于 Channel 的等待队列上
_Gdead没有被使用,没有执行代码,可能有分配的栈
_Gcopystack栈正在被拷贝,没有执行代码,不在运行队列上
_Gpreempted由于抢占而被阻塞,没有执行用户代码并且不在运行队列上,等待唤醒
_GscanGC 正在扫描栈空间,没有执行代码,可以与其他状态同时存在

主要归为3类:

  1. 等待中:Goroutine 正在等待某些条件满足,例如:系统调用结束等,包括 _Gwaiting_Gsyscall_Gpreempted 几个状态;
  2. 可运行:Goroutine 已经准备就绪,可以在线程运行,如果当前程序中有非常多的 Goroutine,每个 Goroutine 就可能会等待更多的时间,即 _Grunnable
  3. 运行中:Goroutine 正在某个线程上运行,即 _Grunning

M

M是操作系统线程。调度器最多可以创建 10000 个线程,但是其中大多数的线程都不会执行用户代码(可能陷入系统调用),最多只会有 GOMAXPROCS 个活跃线程能够正常运行。(默认情况下,运行时会将 GOMAXPROCS 设置成当前机器的核数,不会频繁触发操作系统的线程调度和上下文切换,所有的调度都发生在用户态,由 Go 语言调度器触发,减少额外开销)

在默认情况下,一个四核机器会创建四个活跃的操作系统线程,每一个线程都对应一个运行时中的 runtime.m 结构体

1
2
3
4
5
6
7
8
9
10
type m struct {
// 1. Goroutine 相关字段
g0 *g // 持有调度栈的 Goroutine
curg *g // 在当前线程上运行的用户 Goroutine

// 2. 处理器相关字段
p puintptr // 正在运行代码的处理器 p
nextp puintptr // 暂存的处理器 nextp
oldp puintptr // 执行系统调用之前使用线程的处理器 oldp
}

P

处理器P是线程和Goroutine的中间层,能提供线程需要的上下文环境,负责调度线程上的等待队列;通过处理器 P 的调度,每一个内核线程都能够执行多个 Goroutine,它能在 Goroutine 进行一些 I/O 操作时及时让出计算资源,提高线程的利用率。

调度器在启动时会创建 GOMAXPROCS 个处理器,所以 Go 语言程序的处理器数量一定会等于 GOMAXPROCS,这些处理器会绑定到不同的内核线程上。

1
2
3
4
5
6
7
8
9
10
type p struct {
m muintptr

// runqhead、runqtail 和 runq 三个字段表示处理器持有的运行队列,其中存储着待执行的 Goroutine 列表
runqhead uint32
runqtail uint32
runq [256]guintptr
runnext guintptr // 线程下一个需要执行的 Goroutine
status uint32
}

P的状态(对应status字段)

状态描述
_Pidle处理器没有运行用户代码或者调度器,被空闲队列或者改变其状态的结构持有,运行队列为空
_Prunning被线程 M 持有,并且正在执行用户代码或者调度器
_Psyscall没有执行用户代码,当前线程陷入系统调用
_Pgcstop被线程 M 持有,当前处理器由于垃圾回收被停止
_Pdead当前处理器已经不被使用

调度器启动

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
func schedinit() {
......
_g_ := getg()

sched.maxmcount = 10000 // 最多可以创建 10000 个线程,但是可以同时运行的线程还是由 GOMAXPROCS 变量控制
......
lock(&sched.lock) // 锁定调度器,整个程序不执行任何用户Goroutine
sched.lastpoll = uint64(nanotime())
procs := ncpu
if n, ok := atoi32(gogetenv("GOMAXPROCS")); ok && n > 0 {
procs = n
}
if procresize(procs) != nil { // 更新程序中处理器的数量
throw("unknown runnable goroutine during bootstrap")
}
unlock(&sched.lock)
......
}

调用 runtime.procresize 是调度器启动的最后一步,在这一步过后调度器会完成相应数量处理器的启动,等待用户创建运行新的 Goroutine 并为 Goroutine 调度处理器资源。

创建Goroutine

通过关键字go启动一个新的Goroutine来执行任务。编译器将该关键字转为runtime.newproc函数调用,然后调用 runtime.newproc1 函数获取新的 Goroutine 结构体、将其加入处理器的运行队列并在满足条件时调用 runtime.wakep 唤醒新的处理执行 Goroutine:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
func newproc(siz int32, fn *funcval) {
argp := add(unsafe.Pointer(&fn), sys.PtrSize)
gp := getg()
pc := getcallerpc()
systemstack(func() {
newg := newproc1(fn, argp, siz, gp, pc)

_p_ := getg().m.p.ptr()
runqput(_p_, newg, true) // 将新创建的Goroutine newg设置到处理器的runnext,作为下一个处理器执行的任务

if mainStarted {
wakep()
}
})
}

runtime.newproc1 会根据传入参数创建一个状态为_Grunnable的 g 结构体,分为以下几个部分:

  1. 获取或者创建新的 Goroutine 结构体;
  2. 将传入的参数移到 Goroutine 的栈上;
  3. 更新 Goroutine 调度相关的属性。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
func newproc1(fn *funcval, argp unsafe.Pointer, narg int32, callergp *g, callerpc uintptr) *g {
// 1. 获取或创建Goroutine结构体:先在gFree列表中查找空闲Goroutine;如果不存在空闲Goroutine,通过 runtime.malg 创建一个栈大小足够的新结构体
_g_ := getg()
siz := narg
siz = (siz + 7) &^ 7

_p_ := _g_.m.p.ptr()
newg := gfget(_p_)
if newg == nil {
newg = malg(_StackMin)
casgstatus(newg, _Gidle, _Gdead)
allgadd(newg)
}
...
// 2. 将fn函数的所有参数拷贝到栈上
totalSize := 4*sys.RegSize + uintptr(siz) + sys.MinFrameSize // extra space in case of reads slightly beyond frame
totalSize += -totalSize & (sys.SpAlign - 1) // align to spAlign
sp := newg.stack.hi - totalSize
spArg := sp
if narg > 0 { // argp 和 narg 分别是参数的内存空间和大小
memmove(unsafe.Pointer(spArg), argp, uintptr(narg)) // 将参数对应的内存空间整块拷贝到栈上
......
}
// 3. 设置新的 Goroutine 结构体的参数:包括栈指针、程序计数器并更新其状态到 _Grunnable 并返回
memclrNoHeapPointers(unsafe.Pointer(&newg.sched), unsafe.Sizeof(newg.sched))
newg.sched.sp = sp
newg.stktopsp = sp
newg.sched.pc = funcPC(goexit) + sys.PCQuantum // +PCQuantum so that previous instruction is in same function
newg.sched.g = guintptr(unsafe.Pointer(newg))
gostartcallfn(&newg.sched, fn)
newg.gopc = callerpc
newg.ancestors = saveAncestors(callergp)
newg.startpc = fn.fn
casgstatus(newg, _Gdead, _Grunnable)
newg.goid = int64(_p_.goidcache)
_p_.goidcache++
return newg
}

初始化结构体(runtime.gfget

调用链:newproc->newproc1->gfget(+malg)

runtime.gfget通过两种不同方式获取新的runtime.g

  1. Goroutine 所在处理器的 gFree 列表,或者调度器的 sched.gFree 列表中获取 runtime.g
    1. 当处理器的gFree列表为空时:将调度器持有的空闲 Goroutine 转移到当前处理器上,直到 gFree 列表中的 Goroutine 数量达到32;
    2. 当处理器的 Goroutine 数量充足时:从列表头部返回一个新的 Goroutine;
  2. 调用 runtime.malg 生成一个新的 runtime.g,并将结构体追加到全局的 Goroutine 列表 allgs 中。

设置调度信息sched

调用链:newproc->newproc1->...

1
2
3
newg.sched.pc = funcPC(goexit) + sys.PCQuantum      // 程序计数器设置为:runtime.goexit
newg.sched.g = guintptr(unsafe.Pointer(newg)) // Goroutine设置为:新创建的Goroutine 运行的函数
gostartcallfn(&newg.sched, fn)

运行队列(runqput

调用链:newproc->runqput

runtime.runqput 会将 Goroutine 放到运行队列上,这既可能是全局的运行队列,也可能是处理器本地的运行队列:

  1. nexttrue时:将 Goroutine 设置到处理器的 runnext 作为下一个处理器执行的任务;
  2. nextfalse且本地运行队列还有剩余空间时:将 Goroutine 加入处理器持有的本地运行队列;
  3. nextfalse且处理器的本地运行队列已经没有剩余空间时:把本地队列中的一部分 Goroutine 和待加入的 Goroutine 通过 runtime.runqputslow 添加到调度器持有的全局运行队列上;
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
func runqput(_p_ *p, gp *g, next bool) {
if randomizeScheduler && next && fastrand()%2 == 0 {
next = false
}

if next {
retryNext:
oldnext := _p_.runnext
if !_p_.runnext.cas(oldnext, guintptr(unsafe.Pointer(gp))) {
goto retryNext
}
if oldnext == 0 {
return
}
// Kick the old runnext out to the regular run queue.
gp = oldnext.ptr()
}

retry:
h := atomic.LoadAcq(&_p_.runqhead) // load-acquire, synchronize with consumers
t := _p_.runqtail
if t-h < uint32(len(_p_.runq)) {
_p_.runq[t%uint32(len(_p_.runq))].set(gp)
atomic.StoreRel(&_p_.runqtail, t+1) // store-release, makes the item available for consumption
return
}
if runqputslow(_p_, gp, h, t) {
return
}
// the queue is not full, now the put above must succeed
goto retry
}

两个运行队列辨析:处理器本地的运行队列 & 调度器持有的全局运行队列 仅当本地运行队列没有剩余空间时,才使用全局队列。

调度循环:查找可执行的Goroutine

调度器启动之后,Go 语言运行时会调用 runtime.mstart 以及 runtime.mstart1:前者初始化 g0stackguard0stackguard1 字段;后者初始化线程并调用 runtime.schedule 进入调度循环,采用三种方法查找待执行的Goroutine

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
func schedule() {
_g_ := getg()

top:
var gp *g
var inheritTime bool

if gp == nil {
// 1.1 当全局运行队列中有待执行的 Goroutine 时:通过 schedtick 保证有一定几率会从全局的运行队列中查找对应的 Goroutine;
if _g_.m.p.ptr().schedtick%61 == 0 && sched.runqsize > 0 {
lock(&sched.lock)
gp = globrunqget(_g_.m.p.ptr(), 1)
unlock(&sched.lock)
}
}
// 1.2 从处理器本地的运行队列中查找待执行的 Goroutine;
if gp == nil {
gp, inheritTime = runqget(_g_.m.p.ptr())
}
// 1.3 通过 runtime.findrunnable 进行阻塞地查找 Goroutine
if gp == nil {
gp, inheritTime = findrunnable()
}
// 2. 执行获取的Goroutine:通过 runtime.gogo 将 Goroutine 调度到当前线程上
execute(gp, inheritTime)
}

runtime.gogo在不同处理器架构上的实现不同,

  1. 函数调用时,模仿CALL过程的几个关键指令如下:
1
2
3
MOVL gobuf_sp(BX), SP  // 将 runtime.goexit 函数的 PC 恢复到 SP 中
MOVL gobuf_pc(BX), BX // 将待执行函数的程序计数器放到寄存器BX上
JMP BX // 开始执行

函数调用使用CALL指令:将调用方的返回地址加入栈寄存器SP,跳转到目标函数;当目标函数返回后,会从栈中查找调用的地址,并跳转回调用方继续执行剩下的代码。

  1. 从 Goroutine 中运行的函数返回时,跳转到runtime.goexit所在位置执行该函数:
1
2
3
4
5
6
TEXT runtime·goexit(SB),NOSPLIT,$0-0
CALL runtime·goexit1(SB)

func goexit1() {
mcall(goexit0)
}

最终在当前线程的 g0 的栈上调用 runtime.goexit0 函数,该函数会将 Goroutine 转换为 _Gdead 状态、清理其中的字段、移除 Goroutine 和线程的关联并调用 runtime.gfput 重新加入处理器的 Goroutine 空闲列表 gFree

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
func goexit0(gp *g) {
_g_ := getg()
// 1. 将 Goroutine 转换为 `_Gdead` 状态
casgstatus(gp, _Grunning, _Gdead)
// 2. 清理字段
gp.m = nil
...
gp.param = nil
gp.labels = nil
gp.timer = nil
// 3. 移除 Goroutine 和线程的关联
dropg()
// 4. 将gp重新加入处理器的 Goroutine 空闲列表 gFree
gfput(_g_.m.p.ptr(), gp)
// 5. 触发新一轮的Goroutine调度
schedule()
}

以上是Goroutine正常执行和退出的逻辑;多数情况下Goroutine执行过程中,会经历协作式/抢占式调度,让出线程使用权并等待调度器唤醒。

触发调度

找到runtime.schedule函数的调用方,也即所有触发调度的时间点:

除此之外,运行时还会在线程启动 runtime.mstart 和 Goroutine 执行结束 runtime.goexit0 触发调度。

主动挂起

调用链: runtime.gopark -> runtime.park_m

runtime.gopark 是触发调度最常见的方法,该函数会将当前 Goroutine 暂停,被暂停的任务不会放回运行队列,而是等待唤醒:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
func gopark(unlockf func(*g, unsafe.Pointer) bool, lock unsafe.Pointer, reason waitReason, traceEv byte, traceskip int) {
......
mp := acquirem()
gp := mp.curg
status := readgstatus(gp)
if status != _Grunning && status != _Gscanrunning {
throw("gopark: bad g status")
}
mp.waitlock = lock
mp.waitunlockf = unlockf
gp.waitreason = reason
mp.waittraceev = traceEv
mp.waittraceskip = traceskip
releasem(mp)
// can't do anything that might move the G between Ms here.
mcall(park_m)
}

最后一行通过runtime.mcall切换到g0的栈上调用runtime.park_m将当前 Goroutine 的状态从 _Grunning 切换至 _Gwaiting,调用 runtime.dropg 移除线程和 Goroutine 之间的关联,在这之后就可以调用 runtime.schedule 触发新一轮的调度了。

1
2
3
4
5
6
7
8
9
func park_m(gp *g) {
_g_ := getg()

casgstatus(gp, _Grunning, _Gwaiting)
dropg()

......
schedule()
}

当 Goroutine 等待的特定条件满足后,运行时会调用 runtime.goready 将因为调用 runtime.gopark 而陷入休眠的 Goroutine 唤醒

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
func goready(gp *g, traceskip int) {
systemstack(func() {
ready(gp, traceskip, true)
})
}

func ready(gp *g, traceskip int, next bool) {
_g_ := getg()
// 1. 将准备就绪的 Goroutine 的状态切换至 _Grunnable
casgstatus(gp, _Gwaiting, _Grunnable)
// 2. 将 gp 加入处理器的运行队列中,等待调度器的调度
runqput(_g_.m.p.ptr(), gp, next)
if atomic.Load(&sched.npidle) != 0 && atomic.Load(&sched.nmspinning) == 0 {
wakep()
}
}

系统调用

在系统调用前后,会调用运行时的runtime.entersyscallruntime.exitsyscall,这层包装能够在陷入系统调用前触发运行时的准备和清理工作。

准备工作:分离线程和处理器,释放锁

runtime.entersyscall 会在获取当前程序计数器和栈位置之后调用runtime.reentersyscall,完成 Goroutine 进入系统调用前的准备工作:使得处理器和线程分离,当前线程会陷入系统调用等待返回;在锁被释放后,会有其他Goroutine抢占处理器资源。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
func reentersyscall(pc, sp uintptr) {
_g_ := getg()
_g_.m.locks++
_g_.stackguard0 = stackPreempt
_g_.throwsplit = true
// 1. 保存当前的程序计数器 PC 和栈指针 SP 中的内容;
save(pc, sp)
_g_.syscallsp = sp
_g_.syscallpc = pc
// 2. 将 Goroutine 的状态更新至 _Gsyscall;
casgstatus(_g_, _Grunning, _Gsyscall)

// 3. 将 Goroutine 的处理器和线程暂时分离并更新处理器的状态到 _Psyscall;
_g_.m.syscalltick = _g_.m.p.ptr().syscalltick
_g_.m.mcache = nil
pp := _g_.m.p.ptr()
pp.m = 0
_g_.m.oldp.set(pp)
_g_.m.p = 0
atomic.Store(&pp.status, _Psyscall)
if sched.gcwaiting != 0 {
systemstack(entersyscall_gcwait)
save(pc, sp)
}
// 4. 释放当前线程上的锁
_g_.m.locks--
}

恢复工作:为当前Goroutine重新分配资源

调用退出系统调用的函数runtime.exitsyscall为当前Goroutine重新分配资源,有两个不同的执行路径:

  1. 调用快速路径runtime.exitsyscallfast
    1. 如果Goroutine的原处理器处于_Psyscall状态:直接调用 wirep 将 Goroutine 与处理器进行关联;
    2. 如果调度器中存在闲置的处理器,会调用 runtime.acquirep 使用闲置的处理器处理当前 Goroutine;
  2. 调用runtime.exitsyscall0,将当前Goroutine切换至_Grunnable状态,并移除线程M和当前Goroutine的关联:
    1. 如果通过runtime.pidleget获取到闲置的处理器:在该处理器上执行Goroutine;
    2. 其他情况:将当前Goroutine放到全局的运行队列中,等待调度器的调度。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
func exitsyscall() {
_g_ := getg()

oldp := _g_.m.oldp.ptr()
_g_.m.oldp = 0
if exitsyscallfast(oldp) {
_g_.m.p.ptr().syscalltick++
casgstatus(_g_, _Gsyscall, _Grunning)
...

return
}

mcall(exitsyscall0)
_g_.m.p.ptr().syscalltick++
_g_.throwsplit = false
}

协作式调度

runtime.Gosched函数主动让出处理器,允许其他Goroutine运行。该函数无法挂起Goroutine,调度器可能会将当前 Goroutine 调度到其他线程上:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
func Gosched() {
checkTimeouts()
mcall(gosched_m)
}

func gosched_m(gp *g) {
goschedImpl(gp)
}

func goschedImpl(gp *g) {
casgstatus(gp, _Grunning, _Grunnable)
dropg()
lock(&sched.lock)
globrunqput(gp)
unlock(&sched.lock)

schedule()
}

最终在g0的栈上调用runtime.goschedImpl,运行时更新Goroutine的状态到_Grunnable,让出当前处理器并将Goroutine重新放回全局队列。最后,调用runtime.schedule触发调度。

线程管理

Go的运行时通过调度器改变线程所有权,也提供runtime.LockOSThreadruntime.UnlockOSThread绑定Goroutine和线程。

线程生命周期

Go的运行时通过runtime.startm启动线程以执行处理器P:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
func startm(_p_ *p, spinning bool) {
......
if nmp == nil {
/*
没有可用的 M,我们必须释放 sched.lock,并调用 newm。然而,我们已经拥有一个 P 来分配给 M。

一旦释放了 sched.lock,另一个 G(例如,在系统调用中)可能会发现没有空闲的 P;同时 checkdead 可能会发现一个可以运行的 G,但是没有正在运行的 M,因为这个新的 M 还没有启动,从而导致一个表面上的死锁。

通过预分配新 M 的 ID 来避免这种情况,从而在我们释放 sched.lock 之前,将其标记为“正在运行”。这个新 M 最终将运行调度器以执行任何排队的 G。
*/
id := mReserveID()
unlock(&sched.lock)

var fn func()
if spinning {
fn = mspinning
}
newm(fn, _p_, id) // 创建新的线程M
// 释放P的所有权,让新的 M 开始执行任务
releasem(mp)
return
}
}
func newm1(mp *m) {
if iscgo {
...
}
newosproc(mp)
}

创建新线程时调用runtime.newosproc,在Linux平台上通过系统调用clone创建新的操作系统线程:

1
2
3
4
5
6
func newosproc(mp *m) {
stk := unsafe.Pointer(mp.g0.stack.hi)
...
ret := clone(cloneFlags, stk, unsafe.Pointer(mp), unsafe.Pointer(mp.g0), unsafe.Pointer(funcPC(mstart)))
...
}

使用系统调用 clone 创建的线程会在线程主动调用 exit、以及在传入的函数 runtime.mstart 返回时主动退出;runtime.mstart 会执行调用 runtime.newm 时传入的匿名函数 fn,完成了从线程创建到销毁的整个闭环。

参考

Go 语言设计与实现