以运行VGG模型为例,启动脚本代码为:1
2
3
4python main_with_runtime.py --module models.vgg16.gpus=4_straight --recompute -b 32 --data_dir /data/DNN_Dataset/imagenet/full/pytorch-imagenet-data --rank 0 --local_rank 0 --master_addr localhost --config_path models/vgg16/gpus=4_straight/mp_conf.json --distributed_backend gloo 2>&1 >> vgg.log &
python main_with_runtime.py --module models.vgg16.gpus=4_straight --recompute -b 32 --data_dir /data/DNN_Dataset/imagenet/full/pytorch-imagenet-data --rank 1 --local_rank 1 --master_addr localhost --config_path models/vgg16/gpus=4_straight/mp_conf.json --distributed_backend gloo 2>&1 >> vgg.log &
python main_with_runtime.py --module models.vgg16.gpus=4_straight --recompute -b 32 --data_dir /data/DNN_Dataset/imagenet/full/pytorch-imagenet-data --rank 2 --local_rank 2 --master_addr localhost --config_path models/vgg16/gpus=4_straight/mp_conf.json --distributed_backend gloo 2>&1 >> vgg.log &
python main_with_runtime.py --module models.vgg16.gpus=4_straight --recompute -b 32 --data_dir /data/DNN_Dataset/imagenet/full/pytorch-imagenet-data --rank 3 --local_rank 3 --master_addr localhost --config_path models/vgg16/gpus=4_straight/mp_conf.json --distributed_backend gloo 2>&1 >> vgg.log &
runtime/image_classification/main_with_runtime.py
接受的参数
- module=models.vgg16.gpus=4
- recompute: 重计算
- b=32: batchsize
- master_addr=localhost: master节点的地址(多机设置待研究)
- data_dir=xxx: 数据集路径
- config_path=models/vgg16/gpus=4_straight/mp_conf.json:pipeline的构成
- rank=x:
- local_rank=x: (与全局rank的区别待研究)
- distributed_backend=gloo/nccl: DP模式下使用nccl,否则使用gloo。
解读
- 解析参数:在训练前插入参数输出,参见这里
- class SyntheticDataset(Line 107~117): 用于inceptionv3的虚拟数据集
- import module(Line 129~131): 相当于导入了
models/vgg16/gpus=4_straight/__init__.py
,并且得到了一个有5个module的model。 - dtypes(Line 139): 为何input0与target对应的是int64,input0的dtype似乎与line145行的指定dtype=torch.float32有矛盾。
- 生成每个stage对应的输入输出tensor大小(Line 133~155): 先指定input0的size,然后根据
runtime/image_classification/models/vgg16/gpus=4_straight/__init__.py:15~19
可以依次生成out0, out1, out2, out3对应的大小。 - runtime.StageRuntime(Line 177~191):生成运行时信息,包括当前进程对应的stage, num_stages, num_ranks等等。对应变量的值已经打印出来,参考这里
define optimizer(Line 202~232):
num_versions
表示optimizer需要保存的参数的版本数量,在gpus=4_straight组别中,实际输出:| rank | num_versions |
| —— | —————— |
| 0 | 4 |
| 1 | 3 |
| 2 | 2 |
| 3 | 1 |Load data(Line 236~298)
- 进入train() (Line 313)
- train::switch to train mode(Line 339 345): 需要设置r的一些变量。(TODO)
train::warmup (Line 359~361):
| rank | num_warmup_minibatches |
| —— | ——————————— |
| 0 | 3 |
| 1 | 2 |
| 2 | 1 |
| 3 | 0 |对应的是如下红框阶段:
train::static (Line 363~413): forward — backward — forward — backward …
- train::end (Line 415~421): 若干个backward
runtime/image_classification/models/vgg16/gpus=4_straight/mp_conf.json
原文内容如下:1
2
3
4{
"module_to_stage_map": [0, 1, 2, 3, 3],
"stage_to_rank_map": {"0": [0], "1": [1], "2": [2], "3": [3]}
}
什么是module?
参见
runtime/image_classification/models/vgg16/gpus=4_straight/__init__.py
中的model
函数,里面返回了一个list,包含5个items,分别是:Stage0, Stage1, Stage2, Stage3, criterion(用于计算loss)。以上就是5个modules,把vgg模型划分成了5个部分。根据
module_to_stage_map
,有映射关系:1
2
3
4
5
6module --> stage --> rank
---------------------------
Stage0 --> stage0 --> rank0
Stage1 --> stage1 --> rank1
Stage2 --> stage2 --> rank2
Stage3, criterion --> stage3 --> rank3
runtime/optimizer.py
由main_with_runtime.py:222~229
中引用,原文如下:1
2
3
4
5
6
7
8optimizer = sgd.SGDWithWeightStashing(r.modules(), r.master_parameters,
r.model_parameters, args.loss_scale,
num_versions=num_versions,
lr=args.lr,
momentum=args.momentum,
weight_decay=args.weight_decay,
verbose_freq=args.verbose_frequency,
macrobatch=args.macrobatch)
SGDWithWeightStashing继承自optimizer.py
中的OptimizerWithWeightStashing(插入了一个optim_name)1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17class SGDWithWeightStashing(OptimizerWithWeightStashing):
"""
SGD optimizer with weight stashing.
"""
def __init__(self, modules, master_parameters, model_parameters,
loss_scale, num_versions, lr=required, momentum=0,
dampening=0, weight_decay=0, nesterov=False, verbose_freq=0,
macrobatch=False):
super(SGDWithWeightStashing, self).__init__(
optim_name='SGD',
modules=modules, master_parameters=master_parameters,
model_parameters=model_parameters, loss_scale=loss_scale,
num_versions=num_versions, lr=lr, momentum=momentum,
dampening=dampening, weight_decay=weight_decay,
nesterov=nesterov, verbose_freq=verbose_freq,
macrobatch=macrobatch,
)
一些参数的值:
全部打印在这里
- macrobatch=false. 若为真,则最多只存储2个版本的参数。
- queue中存放num_versions个(params, version)对。queue[0]中存放最旧的参数版本,queue[-1]中存放最新的参数版本。
runtime/runtime.py
全部打印在这里
StageRuntime::initialize
接受的参数:
- model:
runtime/image_classification/models/vgg16/gpus=4_straight/__init__.py
中的model() - inputs_module_destinations: {“input”:0}
- configuration_maps:
runtime/image_classification/models/vgg16/gpus=4_straight/mp_conf.json
中的内容 - master_addr, rank, local_rank: 根据args确定
- num_ranks_in_server: 1
- self.target_tensor_names={“target”}
- `enable_compute`: 除了0是False,其他均为True。
处理过程:
- 初始化(Line 83~91): 其中
criterion_input_name = 'out3'
- 给每个tensor赋予一个tag,最后得到self.tensor_tags=
{'input0': 1, 'out0': 2, 'out1': 3, 'out2': 4, 'out3': 5, 'loss': 6, 'target': 7, 'ack': 8}
。注意,这里增加了两个额外的tensor,分别是target
与ack
。target
用于传递label,ack
待研究,应该与其名字作用相关TODO - configure_map及其衍生
- module_to_stage_map: 一对一
- stage_to_rank_map: 一对多,如果一个stage对应多个rank,则此stage在多个rank间做了replicas备份
- stage_to_depth: 暂时不用管
- stage_to_module_map: 一对多, 一个stage可能由多个module组成。例如stage3由Stage3+criterion组成。
- rank_to_stage_map: 一对一: 一个rank对应一个固定stage —> 对应可能多个module
- ranks_in_previous_stage
- num_ranks_in_previous_stage: 前者的长度
- ranks_in_next_stage
- num_ranks_in_next_stage
modules_with_dependencies的生成
modules: list of int, 当前rank对应的module id
| rank | modules | models |
| —— | ———- | ———————————————————————————————— |
| 0 | [0] | [(Stage0(), [“input0”], [“out0”])] |
| 1 | [1] | [(Stage1(), [“out0”], [“out1”])] |
| 2 | [2] | [(Stage2(), [“out1”], [“out2”])] |
| 3 | [3,4] | [(Stage3(), [“out2”], [“out3”]),(criterion, [“out3”], [“loss”])] |
comm_handler: TODO
- send_ranks, receive_ranks(Line 192~206, Line 208~222): 枚举,若module i的输出(tensor_name)在module j的输入中,并且i与j对应的stage不是同一个,则需要将tensor_name发送。’input’与’input0’有什么区别?(TODO)
- move to cuda(Line 224~229)
- 在replicas间做数据并行(Line 232~279):
- groups: groups[stageID]表示第stageID个stage的通信域。等某个stage对应多个rank的时候才会创建新的通信域(用于多replicas间的数据并行通信)
- modules[i]使用torch.nn.parallel.DistributedDataParallel()包装。
- 计算本机对应需要做数据并行的num_parameters, module_size.
StageRuntime::train
训练过程中的多版本tensor与weights的变化情况:
对comm_handler作初始化。
modules[i].train
: 参考这里,设置为train模式。
StageRuntime::run_forward
- 接受tensor
- 执行前向计算
- 发送tensors
- 更新信息: forward_minibatch_id+=1
StageRuntime::receive_tensors_forward
由于代码中没有调用set_loader
,所以self.loader_iter
is None。
Togo: 按我的理解,第0个stage应该有loader_iter,负责读取数据集输入?
- comm_handler.recv() (Line 415~420):
- comm_handler.increment_messaging_index() (Line 426~428): 处理replicas情形,暂不管。
runtime/communication.py
打印的值
基本流程概览:
接受的参数
- master_addr: localhost
- master_port: 12345
- rank, local_rank: same as in args
- num_ranks_in_server: 1
- world_size: 4
- fp16: False
- distributed_backend : gloo
__init__
- 调用
torch.distributed.init_process_group()
初始化了一个通信域。 - 由于传入的
num_ranks_in_server==1
所以直接return。
initialize
receive_ranks
: 不仅包括传入的中间变量,还包括target
。e.g.rank=2, comm_handler.receive_ranks={'out1': [1], 'target': [1]}
send_ranks
: 与receive_ranks
相对应。e.g.rank=1, comm_handler.send_ranks={'out1': [2], 'target': [2]}
tensor_tags
: 每个rank都一样,{'input0': 1, 'out0': 2, 'out1': 3, 'out2': 4, 'out3': 5, 'loss': 6, 'target': 7, 'ack': 8}
target_tensor_names
:{'target}
rank_in_stage
: 在replica情形下,在子数据并行模块中的rank。此处均为0.num_ranks_in_stage
: 此处均为1ranks_in_previous_stage
: 如名。num_ranks_in_previous_stage
: 上一项的长度ranks_in_next_stage
num_ranks_in_next_stage
setup_queues
- forward_receive_queues
- backward_receive_queues
- forward_send_queues
- backward_send_queues
- num_forward_threads
- num_backward_threads
- num_ack_threads
rank | FR | FS | BR | BS | NF | NB | NA |
---|---|---|---|---|---|---|---|
0 | target | out0, target | out0, ack | ack | 2 | 1 | 1 |
1 | out0, target | out1, target | out1, ack | out0, ack | 4 | 2 | 2 |
2 | out1, target | out2, target | out2, ack | out1, ack | 4 | 2 | 2 |
3 | out2, target | target | ack | out2,ack | 2 | 1 | 1 |
在执行(Line 130~146)时,receive_ranks={'out1':[1]}
, 在(Line 165~178)中会讲target
加入receive_ranks
。其余同理。
setup_messaging_schedule
应该是用于有replicas的情况下,应该传输给谁。
- fwd_messaging_scheduling_row(FR)
- fwd_messaging_scheduling_col(FC)
- bwd_messaging_scheduling_row(BR)
- bwd_messaging_scheduling_col(BC)
rank | FR | FC | BR | BC |
---|---|---|---|---|
0 | -1 | 0 | -1 | 0 |
1 | 0 | 0 | 0 | 0 |
2 | 0 | 0 | 0 | 0 |
3 | 0 | 0 | 0 | 0 |
create_process_groups
由于num_ranks_in_server=1
,所以直接跳过。
start_helper_threads
- 设置
counter
为总线程数量。当counter归零时,表示所有线程安全退出。 - 处理
ack
, 由于是train模式,所以讲ack删除, 所以后面还会用到ack吗? - 计算前向反向迭代次数,用于处理有replicas的情况(Line 269~273)
- dtype=torch.float32
- 跳过
target
或者ack
。 为什么这里要跳过target?
start_helper_thread
根据给的必要信息,启动send_helper_thread
或者recv_helper_thread
send_helper_thread_args
根据提供的tensor_name
,index
,backward:Bool
, num_iterations
,返回对应的(queue, self.counter, self.local_rank, tensor_name, self.rank, dst_rank, tag, sub_process_group, num_iterations)
由于ranks_in_server=[]
,所以is_gpu_to_gpu_comm
总是返回False,所以总是设置sub_process_group=None
send_helper_thread
tensor=queue.remove()
参照runtime/threadsafe_queue.py:20
。- 调用
_send
发送
_send
由于sub_process_group==None
,所以总是现将tensor复制到cpu再发送。
- 转到cpu
- 发送shape
- 发送tensor
_recv
同样经过cpu中转。
- 接受shape
- 根据shape创建同样大小的tensor
- 接受tensor
- 转到gpu
转载请注明来源,欢迎对文章中的引用来源进行考证,欢迎指出任何有错误或不够清晰的表达。