训练脚本入口点:

  • pretrain_gpt.py - GPT模型训练
  • pretrain_bert.py - BERT模型训练
  • pretrain_mamba.py - Mamba模型训练
  • pretrain_t5.py - T5模型训练
  • pretrain_vlm.py - 视觉语言模型训练
  • train_rl.py - 强化学习训练

例如 pretrain_gpt.py:if name == "main": -> 调用 megatron.training.pretrain 函数

1
2
3
4
5
6
7
8
9
10
11
12
from megatron.training import pretrain
if __name__ == "__main__":
......
pretrain(
train_valid_test_datasets_provider,
partial(model_provider, gpt_builder),
ModelType.encoder_or_decoder,
forward_step,
args_defaults={'tokenizer_type': 'GPT2BPETokenizer'},
extra_args_provider=add_modelopt_args if has_nvidia_modelopt else None,
store=store,
)

从 Megatron 训练的代码入口 pretrain 函数进入:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
def pretrain(
train_valid_test_dataset_provider,
model_provider,
model_type,
forward_step_func,
......
):
# 1. initialize Megatron
initialize_megatron(
extra_args_provider=extra_args_provider,
args_defaults=args_defaults,
get_embedding_ranks=get_embedding_ranks,
get_position_embedding_ranks=get_position_embedding_ranks,
store=store,
)

# 2. setup model, optimizer and lr schedule using the model_provider
model, optimizer, opt_param_scheduler = setup_model_and_optimizer(
model_provider, model_type, checkpointing_context=checkpointing_context
)

# 3. call train_val_test_data_provider to get train/val/test datasets
train_data_iterator, valid_data_iterator, test_data_iterator = (
build_train_valid_test_data_iterators(train_valid_test_dataset_provider)
)

# 4. train the model using the forward_step_func
iteration, num_floating_point_operations_so_far = train(
forward_step_func,
model,
optimizer,
opt_param_scheduler,
train_data_iterator,
valid_data_iterator,
process_non_loss_data_func,
config,
checkpointing_context,
non_loss_data_func,
)

初始化 Megatron 的分布式环境:initialize_megatron

initialize_megatron -> finish_mpu_init -> _initialize_distributed

-> 1. torch.distributed

-> 2. mpu.initialize_model_parallel()

1. Pytorch 分布式初始化:初始化 torch.distributed

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
def _initialize_distributed(get_embedding_ranks, get_position_embedding_ranks, store):
"""Initialize torch.distributed and mpu.
| Node1 | Node2 |
____________| p1 | p2 | p3 | p4 |
local_rank | 0 | 1 | 0 | 1 |
rank | 0 | 1 | 2 | 3 |

node: 物理结点,1台机器或者1个容器。图中2个物理结点
rank: 进程在全局上的序号。图中4个进程
local_rank: 进程在 node/全局上的序号。
torch.cuda.device_count(): 当前进程所在的 node 上可使用的 GPU 的数量
device: GPU在某个 node 上的编号
"""
args = get_args()
device_count = torch.cuda.device_count()

# 检查是否已经初始化
if torch.distributed.is_initialized():
# 已初始化,获取rank和world_size
args.rank = torch.distributed.get_rank() # 当前进程的全局序号
args.world_size = torch.distributed.get_world_size() # 全局进程个数
else:
# 未初始化,进行初始化
if device_count > 0:
torch.cuda.set_device(args.local_rank)
device_id = torch.device(f'cuda:{args.local_rank}')

# 设置CUDA流(用于CUDA Graph)
if args.external_cuda_graph:
torch.cuda.set_stream(torch.cuda.Stream())

# 初始化进程组
init_process_group_kwargs = {
'backend': args.distributed_backend,
'store': store,
'world_size': args.world_size, # 全局进程数
'rank': args.rank, # 当前进程在全局上的序号
'timeout': timedelta(minutes=args.distributed_timeout_minutes), # 每个进程的最大等待时间
}
torch.distributed.init_process_group(**init_process_group_kwargs)
inprocess_restart.maybe_force_nccl_backend_init(device_id)

一点拓展:Pytorch 的 init_process_group

关键是调用torch.distributedinit_process_group方法,会阻塞直到所有进程加入一个大的进程 group。

init_process_group有两种初始化方法:

  1. 明确指定 storerankworld_size
  2. 指定 init_method(一个 URL 字符串),指示在哪里/如何发现对等点;

如果两者都没有指定,init_method 则假定为“env://”。即 storeinit_method 是互斥的。

init_process_group 的部分关键参数如下:

  1. 后端:有效值包括 mpi, gloo(CPU) 和 nccl(gpu) 共3种方式。(如果使用 NCLL,每个进程必须对 GPU 有独占访问权限;进程之间不宜共享 GPU,容易造成死锁)
  2. store:所有 worker 都可以访问的键/值存储,用于交换连接/地址信息;
  3. timeout:针对进程组执行的操作超时。默认值等于 30 分钟(适用于 gloo 后端);对于 nccl,仅在环境变量 NCCL_BLOCKING_WAITNCCL_ASYNC_ERROR_HANDLING设置为1时适用;
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
def init_process_group(backend,
init_method=None,
timeout=default_pg_timeout,
world_size=-1,
rank=-1,
store=None,
group_name='',
pg_options=None):

backend = Backend(backend)
if backend == Backend.MPI:
default_pg = _new_process_group_helper(...)
_update_default_pg(default_pg)
else:
if store is None:
# 对于非 MPI 后端:如果没有 store,使用 init_method 选择一个 _rendezvous_handler 以构建一个 store
rendezvous_iterator = rendezvous(
init_method, rank, world_size, timeout=timeout
)
store, rank, world_size = next(rendezvous_iterator)
store.set_timeout(timeout)

default_pg = _new_process_group_helper(...)
_update_default_pg(default_pg)

目前DDP模块支持三种初始化方式:

  1. 从环境变量读取配置(在所有机器上设置,所有进程都连接到 rank0 进程):
    • MASTER_PORT:rank 0 进程的机器上的端口;
    • MASTER_ADDR:rank 0 进程的机器上的 IP 地址;
    • WORLD_SIZE: 进程总数,master 知道要等待多少 worker;
    • RANK: 每个进程的rank,进程知道自己是否是 master
  2. 共享文件系统:init_method='file:///mnt/nfs/sharedfile',所有进程均可访问;
  3. TCP:init_method='tcp://10.1.1.20:23456',提供 rank 0 进程的 IP 和端口,所有 worker 均可连接到 rank 0 进程。

store 是分布式包提供的分布式 KV 存储,所有 workers 均可访问(init_method的本质是创建一个 store)。当前有3种 KV 存储:TCPStore, FileStore, HashStore

所有进程找到彼此并通信的过程称为 rendezvous

FileStore:

1
2
3
def _file_rendezvous_handler(url: str, **kwargs):
......
store = FileStore(path, world_size)

TCPStore:

1
2
3
def _tcp_rendezvous_handler(url: str, timeout: timedelta = default_pg_timeout, **kwargs):
......
store = TCPStore(result.hostname, result.port, world_size, start_daemon, timeout)

init_process_group流程:

  1. 调用rendezvous获取一个迭代器;rendezvous根据init_method选择一个_rendezvous_handler,返回一个store

    1
    2
    3
    4
    rendezvous_iterator = rendezvous(
    init_method, rank, world_size, timeout=timeout
    )
    store, rank, world_size = next(rendezvous_iterator)
  2. 使用store初始化进程组:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    default_pg = _new_process_group_helper(
    world_size,
    rank,
    [],
    backend,
    store,
    pg_options=pg_options,
    group_name=group_name,
    timeout=timeout)
    _update_default_pg(default_pg)

    其中:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    def _new_process_group_helper(...):
    backend = Backend(backend)
    pg: Union[ProcessGroupGloo, ProcessGroupMPI, ProcessGroupNCCL]
    if backend == Backend.MPI: # 没有使用 store
    pg = ProcessGroupMPI.create(group_ranks)
    if not pg:
    return GroupMember.NON_GROUP_MEMBER
    _pg_map[pg] = (Backend.MPI, None)
    _pg_names[pg] = group_name
    else: # 使用 store
    ......
    prefix_store = PrefixStore(group_name, store)
    if backend == Backend.GLOO:
    pg = ProcessGroupGloo(
    prefix_store, # 使用 PrefixStore 构建进程组
    rank,
    world_size,
    timeout=timeout)
    _pg_map[pg] = (Backend.GLOO, store)
    _pg_names[pg] = group_name
    elif backend == backend.NCCL:
    pg = ProcessGroupNCCL(...)
    else:
    pg = getattr(Backend, backend.upper())(
    prefix_store,
    rank,
    world_size,
    timeout)

2. 模型并行初始化:调用 mpu.initialize_model_parallel()

设置完大的进程 group 后,可以设置细分的进程 subgroup.

1
2
3
4
5
6
mpu.initialize_model_parallel(
args.tensor_model_parallel_size, # 每个 TP 组的进程数量
args.pipeline_model_parallel_size, # 每个 PP 组的进程数量
args.virtual_pipeline_model_parallel_size, # 每个 virtual PP 组的进程数量
......
)

2.1 参数验证和计算

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
def initialize_model_parallel(
tensor_model_parallel_size: int = 1,
pipeline_model_parallel_size: int = 1,
virtual_pipeline_model_parallel_size: Optional[int] = None,
pipeline_model_parallel_comm_backend: Optional[str] = None,
use_sharp: bool = False,
context_parallel_size: int = 1,
hierarchical_context_parallel_sizes: Optional[List[int]] = None,
expert_model_parallel_size: int = 1,
num_distributed_optimizer_instances: int = 1,
expert_tensor_parallel_size: Optional[int] = None,
nccl_communicator_config_path: Optional[str] = None,
distributed_timeout_minutes: int = 30,
order: str = "tp-cp-ep-dp-pp",
get_embedding_ranks: Optional[Callable[[List[int], Optional[int]], List[int]]] = None,
get_position_embedding_ranks: Optional[Callable[[List[int], Optional[int]], List[int]]] = None,
create_gloo_process_groups: bool = True,
high_priority_stream_groups: Optional[List[str]] = None,
sharp_enabled_group: Optional[str] = None,
) -> None:

# 1. 获取 world_size 和 rank
assert torch.distributed.is_initialized() # 确保 torch 已经分布式初始化
world_size: int = torch.distributed.get_world_size() # 全局进程总数
rank = torch.distributed.get_rank() # 当前进程的序号

# 2. 计算模型大小和数据并行大小
# MP=TP*PP*CP
# DP=world_size/(TP*PP)
model_size = tensor_model_parallel_size * pipeline_model_parallel_size * context_parallel_size
if world_size % model_size != 0:
raise RuntimeError(...)
data_parallel_size: int = world_size // model_size

2.2 创建 rank 生成器:RankGenerator

RankGenerator根据并行配置生成各种并行组的 rank 分配。核心方法get_ranks(token)根据 token(如'tp', 'dp', 'tp-pp')生成对应的 rank 组(使用正交并行算法,确保不用并行维度之间的 rank 分配是正交的)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
decoder_rank_generator = RankGenerator(
tp=tensor_model_parallel_size, # 张量并行大小
ep=1, # 专家并行(非专家层为1)
dp=data_parallel_size, # 数据并行大小
pp=pipeline_model_parallel_size, # 管道并行大小
cp=context_parallel_size, # 上下文并行大小
order=order, # 并行顺序
rank_offset=0,
)

# 创建专家 rank 生成器(用于MoE)
if expert_tensor_parallel_size is None:
expert_tensor_parallel_size = tensor_model_parallel_size
expert_tensor_model_pipeline_parallel_size = (
expert_tensor_parallel_size * expert_model_parallel_size * pipeline_model_parallel_size
)
expert_data_parallel_size = world_size // expert_tensor_model_pipeline_parallel_size

expert_decoder_rank_generator = RankGenerator(
tp=expert_tensor_parallel_size,
ep=expert_model_parallel_size,
dp=expert_data_parallel_size,
pp=pipeline_model_parallel_size,
cp=1,
order=order,
rank_offset=0,
)

2.3 创建进程组

进程组类型包括:

  1. 数据并行组:_DATA_PARALLEL_GROUP
  2. 张量并行组:_TENSOR_MODEL_PARALLEL_GROUP
  3. 管道并行组:_PIPELINE_MODEL_PARALLEL_GROUP
  4. 上下文并行组:_CONTEXT_PARALLEL_GROUP
  5. 专家并行组:_EXPERT_MODEL_PARALLEL_GROUP
  6. 嵌入组:_EMBEDDING_GROUP
  7. 位置嵌入组:_POSITION_EMBEDDING_GROUP
进程组类型用途通信模式后端
张量并行组张量切分通信All-Reduce, All-GatherNCCL
管道并行组层间数据传递Point-to-PointNCCL/UCC
数据并行组梯度同步All-ReduceNCCL/Gloo
上下文并行组序列切分通信All-GatherNCCL
专家并行组MoE专家通信All-ReduceNCCL
嵌入组嵌入层通信All-GatherNCCL

通过create_group创建 Pytorch 分布式进程组的封装函数;create_grouptorch.distributed.new_group的简化包装.

  1. 创建数据并行组:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    # DP-CP
    global _DATA_PARALLEL_GROUP
    for ranks_with_cp in decoder_rank_generator.get_ranks('dp-cp'):
    group_with_cp = create_group(
    ranks_with_cp,
    timeout=timeout,
    pg_options=get_nccl_options("dp_cp", nccl_comm_cfgs),
    group_desc="DATA_PARALLEL_GROUP_WITH_CP",
    )
    ......
    if create_gloo_process_groups:
    group_with_cp_gloo = create_group(
    ranks_with_cp,
    timeout=timeout,
    backend="gloo",
    group_desc="DATA_PARALLEL_GROUP_WITH_CP_GLOO",
    )
    if rank in ranks_with_cp:
    _DATA_PARALLEL_GROUP_WITH_CP = group_with_cp
    _DATA_PARALLEL_GROUP_WITH_CP_GLOO = group_with_cp_gloo
    _DATA_PARALLEL_GLOBAL_RANKS_WITH_CP = ranks_with_cp

    # DP
    for ranks in decoder_rank_generator.get_ranks('dp'):
    group = create_group(
    ranks,
    timeout=timeout,
    pg_options=get_nccl_options("dp", nccl_comm_cfgs),
    group_desc="DATA_PARALLEL_GROUP",
    )
    ......
  2. 创建上下文并行组

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    # CP
    for ranks in decoder_rank_generator.get_ranks('cp'):
    group = create_group(
    ......
    pg_options=get_nccl_options("cp", nccl_comm_cfgs),
    group_desc="CONTEXT_PARALLEL_GROUP",
    )
    if rank in ranks:
    _CONTEXT_PARALLEL_GROUP = group
    _CONTEXT_PARALLEL_GLOBAL_RANKS = ranks

    # 创建分层上下文并行组(如果指定)
    if hierarchical_context_parallel_sizes:
    hierarchical_groups, _ = create_hierarchical_groups(
    rank, ranks, hierarchical_context_parallel_sizes,
    create_gloo_process_groups=False,
    pg_options=get_nccl_options("hcp", nccl_comm_cfgs),
    timeout=timeout,
    group_desc="CONTEXT_PARALLEL_GROUP",
    )
    if rank in ranks:
    _HIERARCHICAL_CONTEXT_PARALLEL_GROUPS = hierarchical_groups
  3. 创建模型并行组:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    # TP-PP
    for ranks in decoder_rank_generator.get_ranks('tp-pp'):
    group = create_group(
    ......
    pg_options=get_nccl_options("mp", nccl_comm_cfgs),
    group_desc="MODEL_PARALLEL_GROUP",
    )
    if rank in ranks:
    _MODEL_PARALLEL_GROUP = group
    _MODEL_PARALLEL_GLOBAL_RANKS = ranks
    # TP
    for ranks in decoder_rank_generator.get_ranks('tp'):
    group = create_group(
    ......
    pg_options=get_nccl_options("tp", nccl_comm_cfgs),
    group_desc="TENSOR_MODEL_PARALLEL_GROUP",
    )
    if rank in ranks:
    _TENSOR_MODEL_PARALLEL_GROUP = group
    _TENSOR_MODEL_PARALLEL_GLOBAL_RANKS = ranks
  4. 创建管道并行组:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    for ranks in decoder_rank_generator.get_ranks('pp'):
    group = create_group(
    ......
    backend=pipeline_model_parallel_comm_backend,
    pg_options=(
    None if pipeline_model_parallel_comm_backend == "ucc"
    else get_nccl_options("pp", nccl_comm_cfgs)
    ),
    group_desc="PIPELINE_MODEL_PARALLEL_GROUP",
    )
    if rank in ranks:
    if _PIPELINE_MODEL_PARALLEL_GROUP is None:
    _PIPELINE_MODEL_PARALLEL_GROUP = group
    _PIPELINE_GLOBAL_RANKS = ranks
    # 处理多个管道组的情况
    ......
  5. 创建嵌入组:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    # 创建嵌入组
    embedding_ranks = get_embedding_ranks(ranks)
    group = create_group(
    embedding_ranks,
    timeout=timeout,
    pg_options=get_nccl_options("embd", nccl_comm_cfgs),
    group_desc="EMBEDDING_GROUP",
    )
    if rank in embedding_ranks:
    _EMBEDDING_GROUP = group
    _EMBEDDING_GLOBAL_RANKS = embedding_ranks

    # 创建位置嵌入组
    position_embedding_ranks = get_position_embedding_ranks(ranks)
    group = create_group(
    position_embedding_ranks,
    timeout=timeout,
    pg_options=get_nccl_options("pos_embd", nccl_comm_cfgs),
    group_desc="POSITION_EMBEDDING_GROUP",
    )
    if rank in position_embedding_ranks:
    _POSITION_EMBEDDING_GROUP = group
    _POSITION_EMBEDDING_GLOBAL_RANKS = position_embedding_ranks
  6. 创建专家并行组:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    # 创建专家模型并行组
    for ranks in expert_decoder_rank_generator.get_ranks('ep'):
    group = create_group(
    ranks,
    pg_options=get_nccl_options("ep", nccl_comm_cfgs),
    group_desc="EXPERT_MODEL_PARALLEL_GROUP",
    )
    if rank in ranks:
    _EXPERT_MODEL_PARALLEL_GROUP = group

    # 创建专家张量并行组
    for ranks in expert_decoder_rank_generator.get_ranks('tp'):
    group = create_group(
    ranks,
    timeout=timeout,
    pg_options=get_nccl_options("ep_tp", nccl_comm_cfgs),
    group_desc="EXPERT_TENSOR_PARALLEL_GROUP",
    )
    if rank in ranks:
    _EXPERT_TENSOR_PARALLEL_GROUP = group

将划分结果存储在_TENSOR_MODEL_PARALLEL_GROUP等全局变量中,方便在后续切割时使用。

定义以下函数:

1
2
3
4
5
6
7
def get_tensor_model_parallel_group(check_initialized=True):
"""Get the tensor-model-parallel group the caller rank belongs to."""
if check_initialized:
assert (
_TENSOR_MODEL_PARALLEL_GROUP is not None
), "tensor model parallel group is not initialized"
return _TENSOR_MODEL_PARALLEL_GROUP

使得对于任意一个进程,都能查询到其在 DP/TP/PP 组中的 local_rank,和对应的 DP/TP/PP 组的 world_size.

为什么需要一个embedding_group

在GPT类模型中,输入层和输出层共享一个 word_embedding。因此,在计算完梯度,更新 embedding 权重前,输入和输出层需要进行通信,保证 word_embedding 完全一致。也即 PP 组中的第一个和最后一个进程需要通信。设置进程子组的目的是进一步划分通信组,因此这里再添加一个 embedding_group。

实际使用中,也可以采用DeepSpeed-Megatron的方式,使用 ZeRO-R 在 TP 组中针对 activation 做显存优化。参考:https://www.deepspeed.ai/tutorials/megatron/

3. 深入 RankGenerator

如果考虑跨机器通信,优先考虑 PP;其次 DP;最后 TP。因此先划分组的时候,PP的同组中元素跨度是最大的,DP其次,TP则是最小的。

1
2
3
4
5
6
7
8
9
# 在 ProcessMesh 中创建 RankGenerator
self._rank_generator = RankGenerator(
tp=self._tensor_model_parallel_size,
ep=1,
dp=self._data_parallel_size,
pp=self._pipeline_model_parallel_size,
cp=self._context_parallel_size,
order=self._order, # 默认 "tp-cp-ep-dp-pp"
)

关键的order参数"tp-cp-ep-dp-pp"定义了分组的优先级顺序,确保:

  1. TP 组内元素跨度最小:排在最前面
  2. DP 组内元素跨度中等:排在中间
  3. PP 组内元素跨度最大:排在最后

分组算法通过以下方式被调用:

1
2
3
4
5
6
7
8
9
10
11
def generator_wrapper(group_type, is_expert=False, **kwargs):
"""The `RankGenerator` class produces a hyper-rectangle for a given set
of tensor, pipeline, data, expert, and context parallelism."""
if is_expert:
d_ranks = expert_decoder_rank_generator.get_ranks(group_type, **kwargs)
else:
d_ranks = decoder_rank_generator.get_ranks(group_type, **kwargs)

# 根据 group_type 返回相应的分组
for x in d_ranks:
yield x

举一个栗子:

假设有16个 GPU,配置为:TP = 4,PP = 2,DP = 2;按照"tp-cp-ep-dp-pp" 的顺序,RankGenerator 会这样分组:

  1. TP 组(元素跨度最小):
    • 组1: [0, 1, 2, 3]
    • 组2: [4, 5, 6, 7]
    • 组3: [8, 9, 10, 11]
    • 组4: [12, 13, 14, 15]
  2. DP 组(元素跨度中等):
    • 组1: [0, 4, 8, 12]
    • 组2: [1, 5, 9, 13]
  3. PP 组(元素跨度最大):
    • 组1: [0, 1, 2, 3, 4, 5, 6, 7]
    • 组2: [8, 9, 10, 11, 12, 13, 14, 15]

三种并行方式的通信量,一般而言:TP>DP>PP,如何理解呢?

DP 模型每一层都需要梯度通信;PP 边界上相邻的两层 layer,需要 FWD 中的输入 x 通信,和 BWD 中的梯度通信。单层看 PP>DP。

整体来看,总通信量和通信频率 PP<DP。但如果 PP Stage 划分得非常密集,考虑到 FWD 中还有对 x 的通信,可能总体不划算。

参考

Github: NVIDIA/Megatron-LM

Megatron Core User Guide

CodeGeeX: A Pre-Trained Model for Code Generation with Multilingual Benchmarking on HumanEval-X

[源码解析] 模型并行分布式训练Megatron (2) --- 整体架构

[源码解析] PyTorch分布式(6) ---DistributedDataParallel -- init_method&store