并行训练系列:5. Megatron 之分布式环境初始化
训练脚本入口点:
pretrain_gpt.py
- GPT模型训练pretrain_bert.py
- BERT模型训练pretrain_mamba.py
- Mamba模型训练pretrain_t5.py
- T5模型训练pretrain_vlm.py
- 视觉语言模型训练train_rl.py
- 强化学习训练
例如 pretrain_gpt.py
:if name == "main": -> 调用 megatron.training.pretrain 函数
1 | from megatron.training import pretrain |
从 Megatron 训练的代码入口 pretrain
函数进入:
1 | def pretrain( |
初始化 Megatron 的分布式环境:initialize_megatron
initialize_megatron
-> finish_mpu_init
-> _initialize_distributed
-> 1. torch.distributed
-> 2. mpu.initialize_model_parallel()
1. Pytorch 分布式初始化:初始化 torch.distributed
1 | def _initialize_distributed(get_embedding_ranks, get_position_embedding_ranks, store): |
一点拓展:Pytorch 的 init_process_group
关键是调用torch.distributed
的init_process_group
方法,会阻塞直到所有进程加入一个大的进程 group。
init_process_group
有两种初始化方法:
- 明确指定
store
,rank
和world_size
; - 指定
init_method
(一个 URL 字符串),指示在哪里/如何发现对等点;
如果两者都没有指定,init_method
则假定为“env://
”。即 store
和 init_method
是互斥的。
init_process_group
的部分关键参数如下:
- 后端:有效值包括 mpi, gloo(CPU) 和 nccl(gpu) 共3种方式。(如果使用 NCLL,每个进程必须对 GPU 有独占访问权限;进程之间不宜共享 GPU,容易造成死锁)
- store:所有 worker 都可以访问的键/值存储,用于交换连接/地址信息;
- timeout:针对进程组执行的操作超时。默认值等于 30 分钟(适用于 gloo 后端);对于 nccl,仅在环境变量
NCCL_BLOCKING_WAIT
或NCCL_ASYNC_ERROR_HANDLING
设置为1时适用;
1 | def init_process_group(backend, |
目前DDP模块支持三种初始化方式:
- 从环境变量读取配置(在所有机器上设置,所有进程都连接到 rank0 进程):
MASTER_PORT
:rank 0 进程的机器上的端口;MASTER_ADDR
:rank 0 进程的机器上的 IP 地址;WORLD_SIZE
: 进程总数,master 知道要等待多少 worker;RANK
: 每个进程的rank,进程知道自己是否是 master
- 共享文件系统:
init_method='file:///mnt/nfs/sharedfile'
,所有进程均可访问; - TCP:
init_method='tcp://10.1.1.20:23456'
,提供 rank 0 进程的 IP 和端口,所有 worker 均可连接到 rank 0 进程。
store 是分布式包提供的分布式 KV 存储,所有 workers 均可访问(init_method
的本质是创建一个 store)。当前有3种 KV 存储:TCPStore
, FileStore
, HashStore
。
所有进程找到彼此并通信的过程称为 rendezvous:
FileStore
:
1 | def _file_rendezvous_handler(url: str, **kwargs): |
TCPStore
:
1 | def _tcp_rendezvous_handler(url: str, timeout: timedelta = default_pg_timeout, **kwargs): |
init_process_group
流程:
调用
rendezvous
获取一个迭代器;rendezvous
根据init_method
选择一个_rendezvous_handler
,返回一个store
;1
2
3
4rendezvous_iterator = rendezvous(
init_method, rank, world_size, timeout=timeout
)
store, rank, world_size = next(rendezvous_iterator)使用
store
初始化进程组:1
2
3
4
5
6
7
8
9
10default_pg = _new_process_group_helper(
world_size,
rank,
[],
backend,
store,
pg_options=pg_options,
group_name=group_name,
timeout=timeout)
_update_default_pg(default_pg)其中:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28def _new_process_group_helper(...):
backend = Backend(backend)
pg: Union[ProcessGroupGloo, ProcessGroupMPI, ProcessGroupNCCL]
if backend == Backend.MPI: # 没有使用 store
pg = ProcessGroupMPI.create(group_ranks)
if not pg:
return GroupMember.NON_GROUP_MEMBER
_pg_map[pg] = (Backend.MPI, None)
_pg_names[pg] = group_name
else: # 使用 store
......
prefix_store = PrefixStore(group_name, store)
if backend == Backend.GLOO:
pg = ProcessGroupGloo(
prefix_store, # 使用 PrefixStore 构建进程组
rank,
world_size,
timeout=timeout)
_pg_map[pg] = (Backend.GLOO, store)
_pg_names[pg] = group_name
elif backend == backend.NCCL:
pg = ProcessGroupNCCL(...)
else:
pg = getattr(Backend, backend.upper())(
prefix_store,
rank,
world_size,
timeout)
2. 模型并行初始化:调用 mpu.initialize_model_parallel()
设置完大的进程 group 后,可以设置细分的进程 subgroup.
1 | mpu.initialize_model_parallel( |
2.1 参数验证和计算
1 | def initialize_model_parallel( |
2.2 创建 rank 生成器:RankGenerator
RankGenerator
根据并行配置生成各种并行组的 rank 分配。核心方法get_ranks(token)
根据 token(如'tp', 'dp', 'tp-pp')生成对应的 rank 组(使用正交并行算法,确保不用并行维度之间的 rank 分配是正交的)
1 | decoder_rank_generator = RankGenerator( |
2.3 创建进程组
进程组类型包括:
- 数据并行组:
_DATA_PARALLEL_GROUP
- 张量并行组:
_TENSOR_MODEL_PARALLEL_GROUP
- 管道并行组:
_PIPELINE_MODEL_PARALLEL_GROUP
- 上下文并行组:
_CONTEXT_PARALLEL_GROUP
- 专家并行组:
_EXPERT_MODEL_PARALLEL_GROUP
- 嵌入组:
_EMBEDDING_GROUP
- 位置嵌入组:
_POSITION_EMBEDDING_GROUP
进程组类型 | 用途 | 通信模式 | 后端 |
---|---|---|---|
张量并行组 | 张量切分通信 | All-Reduce, All-Gather | NCCL |
管道并行组 | 层间数据传递 | Point-to-Point | NCCL/UCC |
数据并行组 | 梯度同步 | All-Reduce | NCCL/Gloo |
上下文并行组 | 序列切分通信 | All-Gather | NCCL |
专家并行组 | MoE专家通信 | All-Reduce | NCCL |
嵌入组 | 嵌入层通信 | All-Gather | NCCL |
通过create_group
创建 Pytorch 分布式进程组的封装函数;create_group
是torch.distributed.new_group
的简化包装.
创建数据并行组:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31# DP-CP
global _DATA_PARALLEL_GROUP
for ranks_with_cp in decoder_rank_generator.get_ranks('dp-cp'):
group_with_cp = create_group(
ranks_with_cp,
timeout=timeout,
pg_options=get_nccl_options("dp_cp", nccl_comm_cfgs),
group_desc="DATA_PARALLEL_GROUP_WITH_CP",
)
......
if create_gloo_process_groups:
group_with_cp_gloo = create_group(
ranks_with_cp,
timeout=timeout,
backend="gloo",
group_desc="DATA_PARALLEL_GROUP_WITH_CP_GLOO",
)
if rank in ranks_with_cp:
_DATA_PARALLEL_GROUP_WITH_CP = group_with_cp
_DATA_PARALLEL_GROUP_WITH_CP_GLOO = group_with_cp_gloo
_DATA_PARALLEL_GLOBAL_RANKS_WITH_CP = ranks_with_cp
# DP
for ranks in decoder_rank_generator.get_ranks('dp'):
group = create_group(
ranks,
timeout=timeout,
pg_options=get_nccl_options("dp", nccl_comm_cfgs),
group_desc="DATA_PARALLEL_GROUP",
)
......创建上下文并行组
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22# CP
for ranks in decoder_rank_generator.get_ranks('cp'):
group = create_group(
......
pg_options=get_nccl_options("cp", nccl_comm_cfgs),
group_desc="CONTEXT_PARALLEL_GROUP",
)
if rank in ranks:
_CONTEXT_PARALLEL_GROUP = group
_CONTEXT_PARALLEL_GLOBAL_RANKS = ranks
# 创建分层上下文并行组(如果指定)
if hierarchical_context_parallel_sizes:
hierarchical_groups, _ = create_hierarchical_groups(
rank, ranks, hierarchical_context_parallel_sizes,
create_gloo_process_groups=False,
pg_options=get_nccl_options("hcp", nccl_comm_cfgs),
timeout=timeout,
group_desc="CONTEXT_PARALLEL_GROUP",
)
if rank in ranks:
_HIERARCHICAL_CONTEXT_PARALLEL_GROUPS = hierarchical_groups创建模型并行组:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20# TP-PP
for ranks in decoder_rank_generator.get_ranks('tp-pp'):
group = create_group(
......
pg_options=get_nccl_options("mp", nccl_comm_cfgs),
group_desc="MODEL_PARALLEL_GROUP",
)
if rank in ranks:
_MODEL_PARALLEL_GROUP = group
_MODEL_PARALLEL_GLOBAL_RANKS = ranks
# TP
for ranks in decoder_rank_generator.get_ranks('tp'):
group = create_group(
......
pg_options=get_nccl_options("tp", nccl_comm_cfgs),
group_desc="TENSOR_MODEL_PARALLEL_GROUP",
)
if rank in ranks:
_TENSOR_MODEL_PARALLEL_GROUP = group
_TENSOR_MODEL_PARALLEL_GLOBAL_RANKS = ranks创建管道并行组:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16for ranks in decoder_rank_generator.get_ranks('pp'):
group = create_group(
......
backend=pipeline_model_parallel_comm_backend,
pg_options=(
None if pipeline_model_parallel_comm_backend == "ucc"
else get_nccl_options("pp", nccl_comm_cfgs)
),
group_desc="PIPELINE_MODEL_PARALLEL_GROUP",
)
if rank in ranks:
if _PIPELINE_MODEL_PARALLEL_GROUP is None:
_PIPELINE_MODEL_PARALLEL_GROUP = group
_PIPELINE_GLOBAL_RANKS = ranks
# 处理多个管道组的情况
......创建嵌入组:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23# 创建嵌入组
embedding_ranks = get_embedding_ranks(ranks)
group = create_group(
embedding_ranks,
timeout=timeout,
pg_options=get_nccl_options("embd", nccl_comm_cfgs),
group_desc="EMBEDDING_GROUP",
)
if rank in embedding_ranks:
_EMBEDDING_GROUP = group
_EMBEDDING_GLOBAL_RANKS = embedding_ranks
# 创建位置嵌入组
position_embedding_ranks = get_position_embedding_ranks(ranks)
group = create_group(
position_embedding_ranks,
timeout=timeout,
pg_options=get_nccl_options("pos_embd", nccl_comm_cfgs),
group_desc="POSITION_EMBEDDING_GROUP",
)
if rank in position_embedding_ranks:
_POSITION_EMBEDDING_GROUP = group
_POSITION_EMBEDDING_GLOBAL_RANKS = position_embedding_ranks创建专家并行组:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20# 创建专家模型并行组
for ranks in expert_decoder_rank_generator.get_ranks('ep'):
group = create_group(
ranks,
pg_options=get_nccl_options("ep", nccl_comm_cfgs),
group_desc="EXPERT_MODEL_PARALLEL_GROUP",
)
if rank in ranks:
_EXPERT_MODEL_PARALLEL_GROUP = group
# 创建专家张量并行组
for ranks in expert_decoder_rank_generator.get_ranks('tp'):
group = create_group(
ranks,
timeout=timeout,
pg_options=get_nccl_options("ep_tp", nccl_comm_cfgs),
group_desc="EXPERT_TENSOR_PARALLEL_GROUP",
)
if rank in ranks:
_EXPERT_TENSOR_PARALLEL_GROUP = group
将划分结果存储在_TENSOR_MODEL_PARALLEL_GROUP
等全局变量中,方便在后续切割时使用。
定义以下函数:
1 | def get_tensor_model_parallel_group(check_initialized=True): |
使得对于任意一个进程,都能查询到其在 DP/TP/PP 组中的 local_rank,和对应的 DP/TP/PP 组的 world_size.
为什么需要一个
embedding_group
?在GPT类模型中,输入层和输出层共享一个 word_embedding。因此,在计算完梯度,更新 embedding 权重前,输入和输出层需要进行通信,保证 word_embedding 完全一致。也即 PP 组中的第一个和最后一个进程需要通信。设置进程子组的目的是进一步划分通信组,因此这里再添加一个 embedding_group。
实际使用中,也可以采用DeepSpeed-Megatron的方式,使用 ZeRO-R 在 TP 组中针对 activation 做显存优化。参考:https://www.deepspeed.ai/tutorials/megatron/
3. 深入 RankGenerator
如果考虑跨机器通信,优先考虑 PP;其次 DP;最后 TP。因此先划分组的时候,PP的同组中元素跨度是最大的,DP其次,TP则是最小的。
1 | # 在 ProcessMesh 中创建 RankGenerator |
关键的order
参数"tp-cp-ep-dp-pp"定义了分组的优先级顺序,确保:
- TP 组内元素跨度最小:排在最前面
- DP 组内元素跨度中等:排在中间
- PP 组内元素跨度最大:排在最后
分组算法通过以下方式被调用:
1 | def generator_wrapper(group_type, is_expert=False, **kwargs): |
举一个栗子:
假设有16个 GPU,配置为:TP = 4,PP = 2,DP = 2;按照"tp-cp-ep-dp-pp" 的顺序,RankGenerator
会这样分组:
- TP 组(元素跨度最小):
- 组1: [0, 1, 2, 3]
- 组2: [4, 5, 6, 7]
- 组3: [8, 9, 10, 11]
- 组4: [12, 13, 14, 15]
- DP 组(元素跨度中等):
- 组1: [0, 4, 8, 12]
- 组2: [1, 5, 9, 13]
- PP 组(元素跨度最大):
- 组1: [0, 1, 2, 3, 4, 5, 6, 7]
- 组2: [8, 9, 10, 11, 12, 13, 14, 15]
三种并行方式的通信量,一般而言:TP>DP>PP,如何理解呢?
DP 模型每一层都需要梯度通信;PP 边界上相邻的两层 layer,需要 FWD 中的输入 x 通信,和 BWD 中的梯度通信。单层看 PP>DP。
整体来看,总通信量和通信频率 PP<DP。但如果 PP Stage 划分得非常密集,考虑到 FWD 中还有对 x 的通信,可能总体不划算。
参考
CodeGeeX: A Pre-Trained Model for Code Generation with Multilingual Benchmarking on HumanEval-X
[源码解析] 模型并行分布式训练Megatron (2) --- 整体架构
[源码解析] PyTorch分布式(6) ---DistributedDataParallel -- init_method&store