# Pytorch : Parallélisme de modèle sur deux gpus


*Notebook rédigé par l'équipe assistance IA de l'IDRIS, juin 2021*

Ce notebook met en pratique les conseils donnés dans la documentation IDRIS ([PyTorch : Parallélisme de modèle multi GPU](http://www.idris.fr/ia/model-parallelism-pytorch.html)). Pour cette démonstration, nous nous limitons à un entrainement simple (sans validation) avec un modèle **resnet101** appliqué au jeu de données **Imagenet** (disponible sur **DSDIR**).
Seule la version *pipeline* est implémentée.

Ces codes fonctionnent dans le module **pytorch 1.8.0** (avec la version 2.8.3 de **NCCL**).

Le procédé comprend plusieurs étapes :

* les couches du modèle sont réparties sur deux GPU
* lors de l'entraînement, les données sont chargées sur les bons GPU, ie le GPU contenant les premières couches du modèle pour les entrées, et le GPU contenant la dernière couche pour la comparaison avec la donnée vérité (ou label)
* le fichier slurm doit réserver un seul job pour deux GPU    


## Vérification de l'environnement

Ce notebook est prévu pour être exécuté sur une frontale de Jean-Zay (jean-zay{1-5}) et avec une version de Pytorch >= 1.8.0. Il faut donc charger préalablement cet environnement.

In [2]:
!hostname

jean-zay4


In [3]:
!module list

[?1h=Currently Loaded Modulefiles:[m
 1) gcc/8.3.1           4) cudnn/8.0.4.30-cuda-10.2   7) openmpi/4.0.2-cuda     [m
 2) cuda/10.2           5) intel-mkl/2020.1           8) [4mpytorch-gpu/py3/1.7.1[0m  [m
 3) nccl/2.7.8-1-cuda   6) magma/2.5.3-cuda          [m
[K[?1l>

Pour permettre l'exécution du code sur les noeuds de calculs, il faut créer quelques répertoires pour les scripts Slurm, les logs et enfin la sauvegarde de modèles :

In [4]:
!mkdir slurm

In [5]:
!mkdir log

In [6]:
!mkdir checkpoint

## Adaptation du modèle

Le modèle qui sert de support est Resnet101 disponible dans `torchvision`. 

Les modifications consistent :

* à répartir les différentes couches du modèle sur les GPUs disponibles puis à s'assurer de la bonne communication entre les couches (dans la fonction `forward()`, voir plus bas). Les variables `dev0` et `dev1` sont les références des GPU à utiliser. 
* à mettre en place un *pipeline*, c'est à dire à diviser le *batch* de données en plusieurs mini *batches* pour que les GPU puissent exécuter le code en parallèle (variable `split_size` et fonction `forward`)


### Création du modèle



In [6]:
%%writefile resnet.py

import torch
import torch.nn as nn
from torchvision.models.resnet import ResNet, Bottleneck

num_classes = 1000


class PipelinedResnet(ResNet):
    def __init__(self, dev0, dev1, split_size=8, *args, **kwargs):
        super(PipelinedResnet, self).__init__(
            Bottleneck, [3, 4, 23, 3], num_classes=num_classes, *args, **kwargs)
        # dev0 and dev1 point to the GPU (usually gpu:0 and gpu:1)
        self.dev0 = dev0
        self.dev1 = dev1
        self.split_size = split_size

        self.seq0 = nn.Sequential(
            self.conv1,
            self.bn1,
            self.relu,
            self.maxpool,
            self.layer1,
            self.layer2
        ).to(self.dev0)  # sends the first sequence of the model to the first GPU

        self.seq1 = nn.Sequential(
            self.layer3,
            self.layer4,
            self.avgpool,
        ).to(self.dev1)  # sends the second sequence of the model to the second GPU

        self.fc.to(self.dev1)  # last layer is on the second GPU


Overwriting resnet.py


### Liaison entre les couches

Sans optimisation, la fonction `forward()` se résume à décrire le flux de données entre les deux GPU, depuis l'entrée `x` jusqu'à la dernière couche du modèle :
    
```py 
def forward(self, x):
    x= self.seq0(x)     # apply first sequence of the model on input x
    x= x.to(self.dev1)  # send the intermediary result to the second GPU
    x = self.seq1(x)    # apply second sequence of the model to x
    return self.fc(x.view(x.size(0), -1))
```

Le mode *pipeline* dépend de la variable `split_size` qui est à ajuster au cas par cas (idéalement via un *benchmarks* comme présenté sur la documentation officielle). Ici, nous prenons la valeur par défaut, soit 8.

In [7]:
%%writefile -a resnet.py

    def forward(self, x):
        # split setup for x, containing a batch of (image, label) as a tensor
        splits = iter(x.split(self.split_size, dim=0))
        s_next = next(splits)
        s_prev = self.seq0(s_next).to(self.dev1)
        ret = []

        for s_next in splits:
            # A. s_prev runs on dev1
            s_prev = self.seq1(s_prev)
            ret.append(self.fc(s_prev.view(s_prev.size(0), -1)))

            # B. s_next runs on dev0, which can run concurrently with A
            s_prev = self.seq0(s_next).to(self.dev1)

        s_prev = self.seq1(s_prev)
        ret.append(self.fc(s_prev.view(s_prev.size(0), -1)))

        return torch.cat(ret)

Appending to resnet.py


## Création du dataset

Nous utilisons le sous jeu de données de validation de Imagenet (`val`), qui a l'avantage de n'avoir que 50.000 échantillons, ce qui permet d'exécuter un entrainement sur plusieurs époques en quelques minutes (dans le même ordre d'idée et par soucis de concision, nous n'appliquons pas la totalité des transformations conseillées pour ce dataset). 


In [8]:
%%writefile dataset.py

import os

import idr_torch  # see http://www.idris.fr/jean-zay/gpu/jean-zay-gpu-torch-multi.html
import torch
import torchvision
import torchvision.transforms as transforms

def get_dataset(ds_name='val', batch_size=256, num_workers=8):
    transform = transforms.Compose([
        transforms.RandomResizedCrop(224),  # Random resize - Data Augmentation
        transforms.ToTensor(),  # convert the PIL Image to a tensor
    ])
    # dataset creation
    train_ds = torchvision.datasets.ImageNet(root=os.environ['DSDIR'] + '/imagenet/RawImages',
                                            transform=transform,
                                            split=ds_name)  
    # data loader creation with a couple of optimization (pin_memory and prefetch)
    train_loader = torch.utils.data.DataLoader(dataset=train_ds,
                                               batch_size=batch_size,
                                               shuffle=True, num_workers=num_workers,
                                               pin_memory=True, drop_last=True, 
                                               prefetch_factor=2)    
    
    return train_loader

Overwriting dataset.py


## Code principal

### Imports 



In [9]:
%%writefile demo_model_distribution.py

import argparse
import time
from datetime import timedelta

import idr_torch  # see http://www.idris.fr/jean-zay/gpu/jean-zay-gpu-torch-multi.html
import torch
import torch.distributed as dist
import torch.nn as nn

from resnet import PipelinedResnet
import dataset

Overwriting demo_model_distribution.py


### Fonctions utiles

In [10]:
%%writefile -a demo_model_distribution.py

def parse_args():
    parser = argparse.ArgumentParser()
    parser.add_argument('-b', '--batch-size', default=128, type=int,
                        help='batch size')
    parser.add_argument('-e', '--epochs', default=2, type=int, metavar='N',
                        help='number of total epochs to run')
    parser.add_argument('-k', '--checkpoint-path', default='checkpoint/test_', type=str,
                        help='relative path where model can be saved')
    parser.add_argument('-r', '--resume-model', default='', type=str,
                        help='relative path to model to load before resuming training')
    args = parser.parse_args()
    return args


def convert(seconds):
    return time.strftime("%H:%M:%S", time.gmtime(seconds))


Appending to demo_model_distribution.py


### Définition de la boucle d'entraînement

A l'image de l'implémentation du modèle sur deux GPU, il faut maintenant bien définir où sont les entrées et sorties du modèle : 

* les échantillons images doivent être envoyés sur le premier GPU
* les labels, quand à eux, doivent se retrouver sur le même GPU que la sortie du modèle (ie les prédictions), soit le second GPU.

La sauvegarde du modèle (dans la fonction `training`) ne différe pas de la façon de procéder en mono GPU.

In [11]:
%%writefile -a demo_model_distribution.py

def train(model, optimizer, criterion, train_loader, batch_size, gpu):
    model.train()
    if idr_torch.rank == 0:
        running_train_loss = 0.0
        running_train_corrects = 0
    for batch_counter, (images, labels) in enumerate(train_loader):
        # images are sent to the first GPU
        images = images.to(gpu[0], non_blocking=True)
        # zero the parameter gradients
        optimizer.zero_grad()
        # forward
        with torch.set_grad_enabled(True):
            outputs = model(images)
            # labels (ground truth) are sent to the GPU where the outputs of the model
            # reside, which in this case is the second GPU 
            labels = labels.to(outputs.device, non_blocking=True)
            _, preds = torch.max(outputs, 1)
            loss = criterion(outputs, labels)
            # backward + optimize only if in training phase
            loss.backward()
            optimizer.step()
        if idr_torch.rank == 0:
            # statistics
            running_train_loss += loss.item()
            running_train_corrects += torch.sum(preds == labels.data).item()
    if idr_torch.rank == 0:
        epoch_loss = running_train_loss / (batch_counter + 1)
        epoch_acc = 100.0 * running_train_corrects / ((batch_counter + 1) * batch_size)
        print(f'Epoch Train Loss: {epoch_loss:.2f} Acc: {epoch_acc:.2f}')
        
def training(model, train_loader, epochs, batch_size, gpu, checkpoint_path):
    criterion = nn.CrossEntropyLoss()
    optimizer = torch.optim.SGD(model.parameters(), 1e-3)
    scheduler = torch.optim.lr_scheduler.StepLR(optimizer, step_size=20, gamma=0.1)

    if idr_torch.rank == 0:
        total_time = time.time()

    for epoch in range(epochs):
        if idr_torch.rank == 0:
            print(f"Epoch {epoch + 1}/{epochs}")
            t = time.time()

        train(model, optimizer, criterion, train_loader, batch_size, gpu)
        scheduler.step()

        if idr_torch.rank == 0:
            duration = time.time() - t
            print(f"\t Duration : {duration:.2f}")
            print(f"Saving model at epoch {epoch}")
            name = f"{checkpoint_path}{epoch}.pt"
            torch.save(model.state_dict(), name)

    if idr_torch.rank == 0:
        total_time_elapsed = time.time() - total_time

    if idr_torch.rank == 0:
        print("-------------------------------------")
        print(f"Total time: {total_time_elapsed:.2f} \t {convert(total_time_elapsed)}")
        print("-------------------------------------")
    time.sleep(2)  # used only to get a clean log
    for g in gpu:
        print(f"Device id {g} max memory usage: {torch.cuda.max_memory_allocated(g) // (1024 * 1024)} GB")


Appending to demo_model_distribution.py


### Setup 




In [12]:
%%writefile -a demo_model_distribution.py

def get_model(gpu, load_model=None):
    mp_model = PipelinedResnet(dev0=gpu[0], dev1=gpu[1])
    if load_model:
        print(f"Loading model {load_model}")
        mp_model.load_state_dict(torch.load(load_model))
    return mp_model

def main():
    args = parse_args()
    batch_size, epochs, checkpoint_path = args.batch_size, args.epochs, args.checkpoint_path
    if idr_torch.rank == 0:
        print(f"Current setup:")
        print(f"\tTraining on {batch_size} samples per batch, for {epochs} epoch(s).")

    gpu = [0, 1]
    torch.cuda.set_device(0)
    train_loader = dataset.get_dataset(batch_size=batch_size)
    time.sleep(2)  # used only to get a clean log
    model = get_model(gpu=gpu, load_model=args.resume_model)
    time.sleep(2)  # used only to get a clean log
    training(model, train_loader, epochs, batch_size, gpu, checkpoint_path)


if __name__ == '__main__':
    main()

Appending to demo_model_distribution.py


## Exécution de la démo

Nous allons procéder en deux étapes, avec la même configuration (2 GPUs par modèle) :
    
* une première exécution de deux époques avec sauvegarde du modèle à la fin de chaque époque (idéalement, il ne faudrait sauvegarder le modèle que s'il y a amélioration de la Loss) 
* une seconde exécution avec chargement du modèle sauvegardé précédemment

Sur un noeud quadri-GPU, nous allons réserver la moitié des cartes graphiques, mais ne demander qu'un seul processus. Il est alors avantageux de prendre la moitié de la mémoire disponible, ce qui se fait via la directive `#SBATCH --cpus-per-task`. 

### Exécution du code sans récupération de modèle préentrainé

On prendra soin, dans le script Slurm, de bien définir le nombre de GPU et de tâches par noeud. 

**Rappel**:  si votre unique projet dispose d'heures CPU et GPU ou si votre login est rattaché à plusieurs projets, vous devez impérativement préciser l'attribution sur laquelle doit être décomptée les heures consommées par vos calculs, en ajoutant l'option `--account=my_project@gpu` comme indiqué dans la [documentation IDRIS](http://www.idris.fr/jean-zay/cpu/jean-zay-cpu-doc_account.html).

In [13]:
%%writefile slurm/prime_run.slurm
#!/bin/bash
#SBATCH --job-name=save
#SBATCH --output=log/prime_run.out
#SBATCH --error=log/prime_run.err
#SBATCH --gres=gpu:2
#SBATCH --nodes=1
#SBATCH --ntasks-per-node=1
#SBATCH --hint=nomultithread
#SBATCH --time=00:30:00
#SBATCH --qos=qos_gpu-dev
#SBATCH -C v100-16g
#SBATCH --cpus-per-task=20     # on a node with 4 GPU, this is half of the available CPU (and memory)


## load Pytorch module
module purge
module load pytorch-gpu/py3/1.8.0

## launch script on every node
set -x
time srun python -u demo_model_distribution.py -b 128 -e 2 -k "checkpoint/test_"
date

Overwriting slurm/prime_run.slurm


In [14]:
!sbatch 'slurm/prime_run.slurm'

Submitted batch job 1042280


In [15]:
import time

def check_job(name):
    sq = !squeue -u $USER -n '{name}'
    print(sq[0])
    while len(sq) >= 2:
        print(sq[1],end='\r')
        time.sleep(5)
        sq = !squeue -u $USER -n '{name}'
    print('\n Done!')

In [16]:
job_name = 'save' 
check_job(job_name)

             JOBID PARTITION     NAME     USER ST       TIME  NODES NODELIST(REASON)
           1042280   gpu_p13     save  ssos021 CG      13:42      1 r10i0n3
 Done!


In [17]:
!cat "log/prime_run.err"

Loading pytorch-gpu/py3/1.8.0
  Loading requirement: gcc/8.3.1 cuda/10.2 nccl/2.8.3-1-cuda
    cudnn/8.0.4.30-cuda-10.2 intel-mkl/2020.4 magma/2.5.4-cuda
    openmpi/4.0.5-cuda
+ srun python -u demo_model_distribution.py -b 128 -e 2 -k checkpoint/test_

real	13m40.319s
user	0m0.013s
sys	0m0.009s
+ date


In [18]:
!cat "log/prime_run.out"

Current setup:
	Training on 128 samples per batch, for 2 epoch(s).
Epoch 1/2
Epoch Train Loss: 7.01 Acc: 0.10
	 Duration : 398.34
Saving model at epoch 0
Epoch 2/2
Epoch Train Loss: 6.93 Acc: 0.12
	 Duration : 390.42
Saving model at epoch 1
-------------------------------------
Total time: 790.20 	 00:13:10
-------------------------------------
Device id 0 max memory usage: 7950 GB
Device id 1 max memory usage: 8224 GB
Thu Apr 22 10:56:40 CEST 2021


### Exécution du code avec chargement de modèle avant de commencer l'entraînement

Le modèle précédemment créé lors de la première époque sera chargé lors de la création du modèle.  


In [19]:
!ls "checkpoint"

check_0.pt  check_1.pt	test_0.pt  test_1.pt


In [7]:
%%writefile slurm/second_run.slurm
#!/bin/bash
#SBATCH --job-name=load
#SBATCH --output=log/second_run.out
#SBATCH --error=log/second_run.err
#SBATCH --gres=gpu:2
#SBATCH --nodes=1
#SBATCH --ntasks-per-node=1
#SBATCH --hint=nomultithread
#SBATCH --time=00:30:00
#SBATCH --qos=qos_gpu-dev
#SBATCH -C v100-16g
#SBATCH --cpus-per-task=20


## load Pytorch module
module purge
module load pytorch-gpu/py3/1.8.0

## launch script on every node
set -x
time srun python -u demo_model_distribution.py -b 128 -e 2 -k "checkpoint/check_" -r "checkpoint/test_1.pt"
date

Writing slurm/second_run.slurm


In [21]:
!sbatch 'slurm/second_run.slurm'

Submitted batch job 1042756


In [22]:
job_name = 'load' 
check_job(job_name)

             JOBID PARTITION     NAME     USER ST       TIME  NODES NODELIST(REASON)
           1042756   gpu_p13     load  ssos021 CG      14:59      1 r11i2n4
 Done!


In [23]:
!cat "log/second_run.err"

Loading pytorch-gpu/py3/1.8.0
  Loading requirement: gcc/8.3.1 cuda/10.2 nccl/2.8.3-1-cuda
    cudnn/8.0.4.30-cuda-10.2 intel-mkl/2020.4 magma/2.5.4-cuda
    openmpi/4.0.5-cuda
+ srun python -u demo_model_distribution.py -b 128 -e 2 -k checkpoint/check_ -r checkpoint/test_1.pt

real	14m1.860s
user	0m0.012s
sys	0m0.010s
+ date


In [24]:
!cat "log/second_run.out"

Current setup:
	Training on 128 samples per batch, for 2 epoch(s).
Loading model checkpoint/test_1.pt
Epoch 1/2
Epoch Train Loss: 6.92 Acc: 0.10
	 Duration : 403.39
Saving model at epoch 0
Epoch 2/2
Epoch Train Loss: 6.90 Acc: 0.13
	 Duration : 412.37
Saving model at epoch 1
-------------------------------------
Total time: 816.84 	 00:13:36
-------------------------------------
Device id 0 max memory usage: 7950 GB
Device id 1 max memory usage: 8219 GB
Thu Apr 22 11:12:40 CEST 2021
