PipeDream源码理解

以运行VGG模型为例,启动脚本代码为:

1
2
3
4
python main_with_runtime.py --module models.vgg16.gpus=4_straight --recompute -b 32 --data_dir /data/DNN_Dataset/imagenet/full/pytorch-imagenet-data --rank 0 --local_rank 0 --master_addr localhost --config_path models/vgg16/gpus=4_straight/mp_conf.json --distributed_backend gloo 2>&1 >> vgg.log &
python main_with_runtime.py --module models.vgg16.gpus=4_straight --recompute -b 32 --data_dir /data/DNN_Dataset/imagenet/full/pytorch-imagenet-data --rank 1 --local_rank 1 --master_addr localhost --config_path models/vgg16/gpus=4_straight/mp_conf.json --distributed_backend gloo 2>&1 >> vgg.log &
python main_with_runtime.py --module models.vgg16.gpus=4_straight --recompute -b 32 --data_dir /data/DNN_Dataset/imagenet/full/pytorch-imagenet-data --rank 2 --local_rank 2 --master_addr localhost --config_path models/vgg16/gpus=4_straight/mp_conf.json --distributed_backend gloo 2>&1 >> vgg.log &
python main_with_runtime.py --module models.vgg16.gpus=4_straight --recompute -b 32 --data_dir /data/DNN_Dataset/imagenet/full/pytorch-imagenet-data --rank 3 --local_rank 3 --master_addr localhost --config_path models/vgg16/gpus=4_straight/mp_conf.json --distributed_backend gloo 2>&1 >> vgg.log &

runtime/image_classification/main_with_runtime.py

接受的参数

  1. module=models.vgg16.gpus=4
  2. recompute: 重计算
  3. b=32: batchsize
  4. master_addr=localhost: master节点的地址(多机设置待研究)
  5. data_dir=xxx: 数据集路径
  6. config_path=models/vgg16/gpus=4_straight/mp_conf.json:pipeline的构成
  7. rank=x:
  8. local_rank=x: (与全局rank的区别待研究)
  9. distributed_backend=gloo/nccl: DP模式下使用nccl,否则使用gloo。

解读

  1. 解析参数:在训练前插入参数输出,参见这里
  2. class SyntheticDataset(Line 107~117): 用于inceptionv3的虚拟数据集
  3. import module(Line 129~131): 相当于导入了models/vgg16/gpus=4_straight/__init__.py,并且得到了一个有5个module的model。
  4. dtypes(Line 139): 为何input0与target对应的是int64,input0的dtype似乎与line145行的指定dtype=torch.float32有矛盾。
  5. 生成每个stage对应的输入输出tensor大小(Line 133~155): 先指定input0的size,然后根据runtime/image_classification/models/vgg16/gpus=4_straight/__init__.py:15~19可以依次生成out0, out1, out2, out3对应的大小。
  6. runtime.StageRuntime(Line 177~191):生成运行时信息,包括当前进程对应的stage, num_stages, num_ranks等等。对应变量的值已经打印出来,参考这里
  7. define optimizer(Line 202~232): num_versions表示optimizer需要保存的参数的版本数量,在gpus=4_straight组别中,实际输出:

    | rank | num_versions |
    | —— | —————— |
    | 0 | 4 |
    | 1 | 3 |
    | 2 | 2 |
    | 3 | 1 |

  8. Load data(Line 236~298)

  9. 进入train() (Line 313)
  10. train::switch to train mode(Line 339 345): 需要设置r的一些变量。(TODO)
  11. train::warmup (Line 359~361):

    | rank | num_warmup_minibatches |
    | —— | ——————————— |
    | 0 | 3 |
    | 1 | 2 |
    | 2 | 1 |
    | 3 | 0 |

    对应的是如下红框阶段:

  12. train::static (Line 363~413): forward — backward — forward — backward …

  13. train::end (Line 415~421): 若干个backward

runtime/image_classification/models/vgg16/gpus=4_straight/mp_conf.json

原文内容如下:

1
2
3
4
{
"module_to_stage_map": [0, 1, 2, 3, 3],
"stage_to_rank_map": {"0": [0], "1": [1], "2": [2], "3": [3]}
}

  1. 什么是module?

    参见runtime/image_classification/models/vgg16/gpus=4_straight/__init__.py中的model函数,里面返回了一个list,包含5个items,分别是:Stage0, Stage1, Stage2, Stage3, criterion(用于计算loss)。以上就是5个modules,把vgg模型划分成了5个部分。

    根据module_to_stage_map,有映射关系:

    1
    2
    3
    4
    5
    6
    module --> stage --> rank
    ---------------------------
    Stage0 --> stage0 --> rank0
    Stage1 --> stage1 --> rank1
    Stage2 --> stage2 --> rank2
    Stage3, criterion --> stage3 --> rank3

runtime/optimizer.py

main_with_runtime.py:222~229中引用,原文如下:

1
2
3
4
5
6
7
8
optimizer = sgd.SGDWithWeightStashing(r.modules(), r.master_parameters,
r.model_parameters, args.loss_scale,
num_versions=num_versions,
lr=args.lr,
momentum=args.momentum,
weight_decay=args.weight_decay,
verbose_freq=args.verbose_frequency,
macrobatch=args.macrobatch)

SGDWithWeightStashing继承自optimizer.py中的OptimizerWithWeightStashing(插入了一个optim_name)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
class SGDWithWeightStashing(OptimizerWithWeightStashing):
"""
SGD optimizer with weight stashing.
"""
def __init__(self, modules, master_parameters, model_parameters,
loss_scale, num_versions, lr=required, momentum=0,
dampening=0, weight_decay=0, nesterov=False, verbose_freq=0,
macrobatch=False):
super(SGDWithWeightStashing, self).__init__(
optim_name='SGD',
modules=modules, master_parameters=master_parameters,
model_parameters=model_parameters, loss_scale=loss_scale,
num_versions=num_versions, lr=lr, momentum=momentum,
dampening=dampening, weight_decay=weight_decay,
nesterov=nesterov, verbose_freq=verbose_freq,
macrobatch=macrobatch,
)

一些参数的值:

全部打印在这里

  1. macrobatch=false. 若为真,则最多只存储2个版本的参数。
  2. queue中存放num_versions个(params, version)对。queue[0]中存放最旧的参数版本,queue[-1]中存放最新的参数版本。

runtime/runtime.py

全部打印在这里

StageRuntime::initialize

接受的参数:

  1. model: runtime/image_classification/models/vgg16/gpus=4_straight/__init__.py中的model()
  2. inputs_module_destinations: {“input”:0}
  3. configuration_maps: runtime/image_classification/models/vgg16/gpus=4_straight/mp_conf.json中的内容
  4. master_addr, rank, local_rank: 根据args确定
  5. num_ranks_in_server: 1
  6. self.target_tensor_names={“target”}
  7. `enable_compute`: 除了0是False,其他均为True。

处理过程:

  1. 初始化(Line 83~91): 其中criterion_input_name = 'out3'
  2. 给每个tensor赋予一个tag,最后得到self.tensor_tags={'input0': 1, 'out0': 2, 'out1': 3, 'out2': 4, 'out3': 5, 'loss': 6, 'target': 7, 'ack': 8}。注意,这里增加了两个额外的tensor,分别是targetacktarget用于传递label,ack待研究,应该与其名字作用相关TODO
  3. configure_map及其衍生
    • module_to_stage_map: 一对一
    • stage_to_rank_map: 一对多,如果一个stage对应多个rank,则此stage在多个rank间做了replicas备份
    • stage_to_depth: 暂时不用管
    • stage_to_module_map: 一对多, 一个stage可能由多个module组成。例如stage3由Stage3+criterion组成。
    • rank_to_stage_map: 一对一: 一个rank对应一个固定stage —> 对应可能多个module
    • ranks_in_previous_stage
    • num_ranks_in_previous_stage: 前者的长度
    • ranks_in_next_stage
    • num_ranks_in_next_stage
  4. modules_with_dependencies的生成

    • modules: list of int, 当前rank对应的module id

      | rank | modules | models |
      | —— | ———- | ———————————————————————————————— |
      | 0 | [0] | [(Stage0(), [“input0”], [“out0”])] |
      | 1 | [1] | [(Stage1(), [“out0”], [“out1”])] |
      | 2 | [2] | [(Stage2(), [“out1”], [“out2”])] |
      | 3 | [3,4] | [(Stage3(), [“out2”], [“out3”]),(criterion, [“out3”], [“loss”])] |

  5. comm_handler: TODO

  6. send_ranks, receive_ranks(Line 192~206, Line 208~222): 枚举,若module i的输出(tensor_name)在module j的输入中,并且i与j对应的stage不是同一个,则需要将tensor_name发送。’input’与’input0’有什么区别?(TODO)
  7. move to cuda(Line 224~229)
  8. 在replicas间做数据并行(Line 232~279):
    1. groups: groups[stageID]表示第stageID个stage的通信域。等某个stage对应多个rank的时候才会创建新的通信域(用于多replicas间的数据并行通信)
    2. modules[i]使用torch.nn.parallel.DistributedDataParallel()包装。
    3. 计算本机对应需要做数据并行的num_parameters, module_size.

StageRuntime::train

  1. 训练过程中的多版本tensor与weights的变化情况:

  2. 对comm_handler作初始化。

  3. modules[i].train: 参考这里,设置为train模式。

StageRuntime::run_forward

  1. 接受tensor
  2. 执行前向计算
  3. 发送tensors
  4. 更新信息: forward_minibatch_id+=1

StageRuntime::receive_tensors_forward

由于代码中没有调用set_loader,所以self.loader_iter is None。

Togo: 按我的理解,第0个stage应该有loader_iter,负责读取数据集输入?

  1. comm_handler.recv() (Line 415~420):
  2. comm_handler.increment_messaging_index() (Line 426~428): 处理replicas情形,暂不管。

runtime/communication.py

打印的值
基本流程概览:

接受的参数

  1. master_addr: localhost
  2. master_port: 12345
  3. rank, local_rank: same as in args
  4. num_ranks_in_server: 1
  5. world_size: 4
  6. fp16: False
  7. distributed_backend : gloo

__init__

  1. 调用torch.distributed.init_process_group()初始化了一个通信域。
  2. 由于传入的num_ranks_in_server==1所以直接return。

initialize

  1. receive_ranks: 不仅包括传入的中间变量,还包括target。e.g. rank=2, comm_handler.receive_ranks={'out1': [1], 'target': [1]}
  2. send_ranks: 与receive_ranks相对应。e.g. rank=1, comm_handler.send_ranks={'out1': [2], 'target': [2]}
  3. tensor_tags: 每个rank都一样,{'input0': 1, 'out0': 2, 'out1': 3, 'out2': 4, 'out3': 5, 'loss': 6, 'target': 7, 'ack': 8}
  4. target_tensor_names: {'target}
  5. rank_in_stage: 在replica情形下,在子数据并行模块中的rank。此处均为0.
  6. num_ranks_in_stage: 此处均为1
  7. ranks_in_previous_stage: 如名。
  8. num_ranks_in_previous_stage: 上一项的长度
  9. ranks_in_next_stage
  10. num_ranks_in_next_stage

setup_queues

  1. forward_receive_queues
  2. backward_receive_queues
  3. forward_send_queues
  4. backward_send_queues
  5. num_forward_threads
  6. num_backward_threads
  7. num_ack_threads
rank FR FS BR BS NF NB NA
0 target out0, target out0, ack ack 2 1 1
1 out0, target out1, target out1, ack out0, ack 4 2 2
2 out1, target out2, target out2, ack out1, ack 4 2 2
3 out2, target target ack out2,ack 2 1 1

在执行(Line 130~146)时,receive_ranks={'out1':[1]}, 在(Line 165~178)中会讲target加入receive_ranks。其余同理。

setup_messaging_schedule

应该是用于有replicas的情况下,应该传输给谁。

  1. fwd_messaging_scheduling_row(FR)
  2. fwd_messaging_scheduling_col(FC)
  3. bwd_messaging_scheduling_row(BR)
  4. bwd_messaging_scheduling_col(BC)
rank FR FC BR BC
0 -1 0 -1 0
1 0 0 0 0
2 0 0 0 0
3 0 0 0 0

create_process_groups

由于num_ranks_in_server=1,所以直接跳过。

start_helper_threads

  1. 设置counter为总线程数量。当counter归零时,表示所有线程安全退出。
  2. 处理ack, 由于是train模式,所以讲ack删除, 所以后面还会用到ack吗?
  3. 计算前向反向迭代次数,用于处理有replicas的情况(Line 269~273)
  4. dtype=torch.float32
  5. 跳过target或者ack 为什么这里要跳过target?

start_helper_thread

根据给的必要信息,启动send_helper_thread或者recv_helper_thread

send_helper_thread_args

根据提供的tensor_nameindexbackward:Bool, num_iterations,返回对应的
(queue, self.counter, self.local_rank, tensor_name, self.rank, dst_rank, tag, sub_process_group, num_iterations)

由于ranks_in_server=[],所以is_gpu_to_gpu_comm总是返回False,所以总是设置sub_process_group=None

send_helper_thread

  1. tensor=queue.remove() 参照runtime/threadsafe_queue.py:20
  2. 调用_send发送

_send

由于sub_process_group==None,所以总是现将tensor复制到cpu再发送。

  1. 转到cpu
  2. 发送shape
  3. 发送tensor

_recv

同样经过cpu中转。

  1. 接受shape
  2. 根据shape创建同样大小的tensor
  3. 接受tensor
  4. 转到gpu

转载请注明来源,欢迎对文章中的引用来源进行考证,欢迎指出任何有错误或不够清晰的表达。