# Pytorch: Model parallelism on two GPUs

*Notebook written by the IDRIS AI support team, May 2021*

This notebook demonstrates how to implement model parallelism on Jean Zay as presented in [PyTorch: Multi GPU model parallelism](http://www.idris.fr/eng/ia/model-parallelism-pytorch-eng.html). For the sake of clarity, this demo shows a simple training (no validation step) using the **Resnet101** model on **Imagenet** dataset.

Be advised that only the pipeline version of model parallelism is described.

It consists here of:

* Distribution of the model layers on two GPUs.
* Data loading on the GPU containing the first layers of the model, label loading on the other GPU.
* Slurm file setup.

----


## Environment checks
 
This notebook is intended for execution from a Jean Zay front end. The hostname must be jean-zay[1-5]. A PyTorch module must be loaded beforehand in order for this Notebook to function correctly. For example, the ``pytorch-gpu/py3/1.8.0`` module.
 


In [1]:
!hostname

jean-zay1


In [2]:
!module list

[?1h=Currently Loaded Modulefiles:[m
 1) gcc/8.3.1 4) cudnn/8.0.4.30-cuda-10.2 7) [4mopenmpi/4.0.5-cuda[0m [m
 2) cuda/10.2 5) intel-mkl/2020.4 8) pytorch-gpu/py3/1.8.0 [m
 3) nccl/2.8.3-1-cuda 6) magma/2.5.4-cuda [m
[K[?1l>

Additional setup consists in the creation of several folders: for the Slurm configuration, the logs and model saving.


In [3]:
!mkdir slurm

mkdir: cannot create directory ‘slurm’: File exists


In [4]:
!mkdir log

mkdir: cannot create directory ‘log’: File exists


In [5]:
!mkdir checkpoint

mkdir: cannot create directory ‘checkpoint’: File exists


## Model setup


We are relying on `torchvision` model zoo from which we import **Resnet101**.

The required adaptations are:

* Distribution of the model layers on all the available GPUs and communication setup between GPUs (referred as `dev0` and `dev1`).
* Setup of the pipeline, ie. the data batch is split to allow concurrent execution on both GPUs (see variable `split_size` and `forward()` function).


### Model creation



In [6]:
%%writefile resnet.py

import torch
import torch.nn as nn
from torchvision.models.resnet import ResNet, Bottleneck

num_classes = 1000


class PipelinedResnet(ResNet):
 def __init__(self, dev0, dev1, split_size=8, *args, **kwargs):
 super(PipelinedResnet, self).__init__(
 Bottleneck, [3, 4, 23, 3], num_classes=num_classes, *args, **kwargs)
 # dev0 and dev1 point to the GPU (usually gpu:0 and gpu:1)
 self.dev0 = dev0
 self.dev1 = dev1
 self.split_size = split_size

 self.seq0 = nn.Sequential(
 self.conv1,
 self.bn1,
 self.relu,
 self.maxpool,
 self.layer1,
 self.layer2
 ).to(self.dev0) # sends the first sequence of the model to the first GPU

 self.seq1 = nn.Sequential(
 self.layer3,
 self.layer4,
 self.avgpool,
 ).to(self.dev1) # sends the second sequence of the model to the second GPU

 self.fc.to(self.dev1) # last layer is on the second GPU


Overwriting resnet.py


### Communication between layers 

In simple parallelism mode (no pipeline), the `forward()` function describes the data flow between the two GPUs, from input `x` to the last model layer:

 
```py 
def forward(self, x):
 x= self.seq0(x) # apply first sequence of the model on input x
 x= x.to(self.dev1) # send the intermediary result to the second GPU
 x = self.seq1(x) # apply second sequence of the model to x
 return self.fc(x.view(x.size(0), -1))
```

The pipeline mode depends on the `split_size` variable: this variable describes how many chunks of data go through the GPUs. Here, we are using the default value but it should be set on a case-by-case basis, ideally through a benchmarks (as presented on [Pytorch documentation](https://pytorch.org/tutorials/intermediate/model_parallel_tutorial.html#speed-up-by-pipelining-inputs)). 


In [1]:
%%writefile -a resnet.py

 def forward(self, x):
 # split setup for x, containing a batch of (image, label) as a tensor
 splits = iter(x.split(self.split_size, dim=0))
 s_next = next(splits)
 s_prev = self.seq0(s_next).to(self.dev1)
 ret = []

 for s_next in splits:
 # A. s_prev runs on dev1
 s_prev = self.seq1(s_prev)
 ret.append(self.fc(s_prev.view(s_prev.size(0), -1)))

 # B. s_next runs on dev0, which can run concurrently with A
 s_prev = self.seq0(s_next).to(self.dev1)

 s_prev = self.seq1(s_prev)
 ret.append(self.fc(s_prev.view(s_prev.size(0), -1)))

 return torch.cat(ret)

Appending to resnet.py


## Dataset creation

The sub-dataset "validation" from Imagenet (`val`) is used since it allows short training loops (containing only 50.000 samples). In the same vein, we only use the minimum amount of transformations on the data. 


In [8]:
%%writefile dataset.py

import os

import idr_torch # see http://www.idris.fr/jean-zay/gpu/jean-zay-gpu-torch-multi.html
import torch
import torchvision
import torchvision.transforms as transforms

def get_dataset(ds_name='val', batch_size=256, num_workers=8):
 transform = transforms.Compose([
 transforms.RandomResizedCrop(224), # Random resize - Data Augmentation
 transforms.ToTensor(), # convert the PIL Image to a tensor
 ])
 # dataset creation
 train_ds = torchvision.datasets.ImageNet(root=os.environ['DSDIR'] + '/imagenet/RawImages',
 transform=transform,
 split=ds_name) 
 # data loader creation with a couple of optimization (pin_memory and prefetch)
 train_loader = torch.utils.data.DataLoader(dataset=train_ds,
 batch_size=batch_size,
 shuffle=True, num_workers=num_workers,
 pin_memory=True, drop_last=True, 
 prefetch_factor=2) 
 
 return train_loader

Overwriting dataset.py


## Main code

### Imports 



In [9]:
%%writefile demo_model_distribution.py

import argparse
import time
from datetime import timedelta

import idr_torch # see http://www.idris.fr/jean-zay/gpu/jean-zay-gpu-torch-multi.html
import torch
import torch.distributed as dist
import torch.nn as nn

from resnet import PipelinedResnet
import dataset

Overwriting demo_model_distribution.py


### Useful functions

In [10]:
%%writefile -a demo_model_distribution.py

def parse_args():
 parser = argparse.ArgumentParser()
 parser.add_argument('-b', '--batch-size', default=128, type=int,
 help='batch size')
 parser.add_argument('-e', '--epochs', default=2, type=int, metavar='N',
 help='number of total epochs to run')
 parser.add_argument('-k', '--checkpoint-path', default='checkpoint/test_', type=str,
 help='relative path where model can be saved')
 parser.add_argument('-r', '--resume-model', default='', type=str,
 help='relative path to model to load before resuming training')
 args = parser.parse_args()
 return args


def convert(seconds):
 return time.strftime("%H:%M:%S", time.gmtime(seconds))


Appending to demo_model_distribution.py


### Training loop

Now that the model is split on two GPUs, it is time to define to which GPU the input and output should be sent:

* Images from samples will go to the first GPU.
* Labels from samples must be on the same GPU as the output of the models (ie. the prediction), hence the second GPU.

Saving the model (see function `training()`) is the same as when using only one GPU.


In [11]:
%%writefile -a demo_model_distribution.py

def train(model, optimizer, criterion, train_loader, batch_size, gpu):
 model.train()
 if idr_torch.rank == 0:
 running_train_loss = 0.0
 running_train_corrects = 0
 for batch_counter, (images, labels) in enumerate(train_loader):
 # images are sent to the first GPU
 images = images.to(gpu[0], non_blocking=True)
 # zero the parameter gradients
 optimizer.zero_grad()
 # forward
 with torch.set_grad_enabled(True):
 outputs = model(images)
 # labels (ground truth) are sent to the GPU where the outputs of the model
 # reside, which in this case is the second GPU 
 labels = labels.to(outputs.device, non_blocking=True)
 _, preds = torch.max(outputs, 1)
 loss = criterion(outputs, labels)
 # backward + optimize only if in training phase
 loss.backward()
 optimizer.step()
 if idr_torch.rank == 0:
 # statistics
 running_train_loss += loss.item()
 running_train_corrects += torch.sum(preds == labels.data).item()
 if idr_torch.rank == 0:
 epoch_loss = running_train_loss / (batch_counter + 1)
 epoch_acc = 100.0 * running_train_corrects / ((batch_counter + 1) * batch_size)
 print(f'Epoch Train Loss: {epoch_loss:.2f} Acc: {epoch_acc:.2f}')
 
def training(model, train_loader, epochs, batch_size, gpu, checkpoint_path):
 criterion = nn.CrossEntropyLoss()
 optimizer = torch.optim.SGD(model.parameters(), 1e-3)
 scheduler = torch.optim.lr_scheduler.StepLR(optimizer, step_size=20, gamma=0.1)

 if idr_torch.rank == 0:
 total_time = time.time()

 for epoch in range(epochs):
 if idr_torch.rank == 0:
 print(f"Epoch {epoch + 1}/{epochs}")
 t = time.time()

 train(model, optimizer, criterion, train_loader, batch_size, gpu)
 scheduler.step()

 if idr_torch.rank == 0:
 duration = time.time() - t
 print(f"\t Duration : {duration:.2f}")
 print(f"Saving model at epoch {epoch}")
 name = f"{checkpoint_path}{epoch}.pt"
 torch.save(model.state_dict(), name)

 if idr_torch.rank == 0:
 total_time_elapsed = time.time() - total_time

 if idr_torch.rank == 0:
 print("-------------------------------------")
 print(f"Total time: {total_time_elapsed:.2f} \t {convert(total_time_elapsed)}")
 print("-------------------------------------")
 time.sleep(2) # used only to get a clean log
 for g in gpu:
 print(f"Device id {g} max memory usage: {torch.cuda.max_memory_allocated(g) // (1024 * 1024)} GB")


Appending to demo_model_distribution.py


### Setup 




In [12]:
%%writefile -a demo_model_distribution.py

def get_model(gpu, load_model=None):
 mp_model = PipelinedResnet(dev0=gpu[0], dev1=gpu[1])
 if load_model:
 print(f"Loading model {load_model}")
 mp_model.load_state_dict(torch.load(load_model))
 return mp_model

def main():
 args = parse_args()
 batch_size, epochs, checkpoint_path = args.batch_size, args.epochs, args.checkpoint_path
 if idr_torch.rank == 0:
 print(f"Current setup:")
 print(f"\tTraining on {batch_size} samples per batch, for {epochs} epoch(s).")

 gpu = [0, 1]
 torch.cuda.set_device(0)
 train_loader = dataset.get_dataset(batch_size=batch_size)
 time.sleep(2) # used only to get a clean log
 model = get_model(gpu=gpu, load_model=args.resume_model)
 time.sleep(2) # used only to get a clean log
 training(model, train_loader, epochs, batch_size, gpu, checkpoint_path)


if __name__ == '__main__':
 main()

Appending to demo_model_distribution.py


## Demonstration

We will perform two runs (each using two GPUs for the model):

* The first execution creates checkpoints, ie. saves the model's current state at each epoch.
* The second execution loads a checkpoint from first execution, then resumes training.

On a quadri-GPU node, we need to reserve half of the GPU cards and only one task. In this particular setup, half the memory of the node can be reserved through the option `#SBATCH --cpus-per-task` (since half of the GPUs are requested).


### First execution 

It is important to correctly define the number of GPUs and tasks per node in the Slurm file.

**Reminder**: If your single project has both CPU and GPU hours, or if your login is attached to more than one project, you must specify for which allocation the consumed hours should be counted by adding the option `--account=my_project@gpu` as explained in the [IDRIS documentation](http://www.idris.fr/eng/jean-zay/cpu/jean-zay-cpu-doc_account-eng.html).


In [13]:
%%writefile slurm/prime_run.slurm
#!/bin/bash
#SBATCH --job-name=save
#SBATCH --output=log/prime_run.out
#SBATCH --error=log/prime_run.err
#SBATCH --gres=gpu:2
#SBATCH --nodes=1
#SBATCH --ntasks-per-node=1
#SBATCH --hint=nomultithread
#SBATCH --time=00:30:00
#SBATCH --qos=qos_gpu-dev
#SBATCH -C v100-16g
#SBATCH --cpus-per-task=20 # on a node with 4 GPU, this is half of the available CPU (and memory)


## load Pytorch module
module purge
module load pytorch-gpu/py3/1.8.0

## launch script on every node
set -x
time srun python -u demo_model_distribution.py -b 128 -e 2 -k "checkpoint/test_"
date

Overwriting slurm/prime_run.slurm


In [14]:
!sbatch 'slurm/prime_run.slurm'

Submitted batch job 1103082


`idr_pytools` is a small script which enables access to Slurm variables. For more information, see [Python scripts for automated execution of GPU jobs](http://www.idris.fr/eng/jean-zay/gpu/scripts-python-execution-travaux-gpu-eng.html).


In [15]:
from idr_pytools import display_slurm_queue

job_name = 'save' 
display_slurm_queue(job_name)

 JOBID PARTITION NAME USER ST TIME NODES NODELIST(REASON)
 1103082 gpu_p13 save ssos021 R 13:41 1 r10i1n0

 Done!


In [16]:
!cat "log/prime_run.err"

Loading pytorch-gpu/py3/1.8.0
 Loading requirement: gcc/8.3.1 cuda/10.2 nccl/2.8.3-1-cuda
 cudnn/8.0.4.30-cuda-10.2 intel-mkl/2020.4 magma/2.5.4-cuda
 openmpi/4.0.5-cuda
+ srun python -u demo_model_distribution.py -b 128 -e 2 -k checkpoint/test_

real	13m40.827s
user	0m0.014s
sys	0m0.009s
+ date


In [17]:
!cat "log/prime_run.out"

Current setup:
	Training on 128 samples per batch, for 2 epoch(s).
Epoch 1/2
Epoch Train Loss: 7.00 Acc: 0.13
	 Duration : 403.34
Saving model at epoch 0
Epoch 2/2
Epoch Train Loss: 6.93 Acc: 0.12
	 Duration : 393.20
Saving model at epoch 1
-------------------------------------
Total time: 797.47 	 00:13:17
-------------------------------------
Device id 0 max memory usage: 7950 GB
Device id 1 max memory usage: 8224 GB
Mon Apr 26 17:36:01 CEST 2021


### Second execution: Load a checkpoint before resuming training

The model parameters saved during the previous execution will be loaded during the model creation step.


In [18]:
!ls "checkpoint"

check_0.pt check_1.pt	test_0.pt test_1.pt


In [19]:
%%writefile slurm/second_run.slurm
#!/bin/bash
#SBATCH --job-name=load
#SBATCH --output=log/second_run.out
#SBATCH --error=log/second_run.err
#SBATCH --gres=gpu:2
#SBATCH --nodes=1
#SBATCH --ntasks-per-node=1
#SBATCH --hint=nomultithread
#SBATCH --time=00:30:00
#SBATCH --qos=qos_gpu-dev
#SBATCH -C v100-16g
#SBATCH --cpus-per-task=20


## load Pytorch module
module purge
module load pytorch-gpu/py3/1.8.0

## launch script on every node
set -x
time srun python -u demo_model_distribution.py -b 128 -e 2 -k "checkpoint/check_" -r "checkpoint/test_1.pt"
date

Overwriting slurm/second_run.slurm


In [20]:
!sbatch 'slurm/second_run.slurm'

Submitted batch job 1103323


In [21]:
job_name = 'load' 
display_slurm_queue(job_name)

 JOBID PARTITION NAME USER ST TIME NODES NODELIST(REASON)
 1103323 gpu_p13 load ssos021 R 13:32 1 r10i1n0

 Done!


In [22]:
!cat "log/second_run.err"

Loading pytorch-gpu/py3/1.8.0
 Loading requirement: gcc/8.3.1 cuda/10.2 nccl/2.8.3-1-cuda
 cudnn/8.0.4.30-cuda-10.2 intel-mkl/2020.4 magma/2.5.4-cuda
 openmpi/4.0.5-cuda
+ srun python -u demo_model_distribution.py -b 128 -e 2 -k checkpoint/check_ -r checkpoint/test_1.pt

real	13m30.334s
user	0m0.014s
sys	0m0.008s
+ date


In [23]:
!cat "log/second_run.out"

Current setup:
	Training on 128 samples per batch, for 2 epoch(s).
Loading model checkpoint/test_1.pt
Epoch 1/2
Epoch Train Loss: 6.91 Acc: 0.11
	 Duration : 397.74
Saving model at epoch 0
Epoch 2/2
Epoch Train Loss: 6.90 Acc: 0.16
	 Duration : 392.86
Saving model at epoch 1
-------------------------------------
Total time: 791.35 	 00:13:11
-------------------------------------
Device id 0 max memory usage: 7950 GB
Device id 1 max memory usage: 8219 GB
Mon Apr 26 17:49:42 CEST 2021
