# Chargement et pré-traitement des données avec PyTorch

## Mise en pratique

*Notebook rédigé par l'équipe assistance IA de l'IDRIS, mars 2021*

Ce document présente la méthode à adopter sur Jean Zay pour charger et pré-traiter les données d'entrée pour un entraînement distribué. Il prend comme référence la [documentation PyTorch](https://pytorch.org/docs/stable/data.html) et illustre la [documentation IDRIS](http://www.idris.fr/jean-zay/gpu/jean-zay-gpu-torch-data-preprocessing.html).

Ce Notebook est composé :
  * d'un [exemple complet](#exemple) de chargement optimisé
  * de t[ests de comparaison](#tests) des gains de performance offerts par chaque fonctionnalité présentée dans la documentation (distribution, multiprocessing, prefetching, etc)

### Environnement de calcul

Ce notebook peut être exécuté sur n'importe quel noeud de Jean-Zay mais nous vous conseillons d'utiliser la frontale jupyterhub de Jean-Zay (i.e. une connexion *interactive*) pour éviter de consommer votre allocation. Dans ce cas, le hostname est `jean-zay-srv2` :

In [1]:
!hostname

jean-zay-srv2


Ce notebook ne nécessite pas le chargement d'un module particulier. Les travaux seront soumis via Slurm dans l'environnement `pytorch-gpu/py3/1.7.1`.

## Exemple complet de chargement optimisé <a class="anchor" id="exemple"></a>

### Création du script python de chargement des données - version optimisée

In [2]:
%%writefile mnist_loader.py 
import os
import time
import torch
import torchvision
import idr_torch # IDRIS package available in all PyTorch modules - interface with Slurm

if idr_torch.rank == 0:
    print(f' --- Running on {idr_torch.size} GPU ---')

global_start_time=time.time()

# init multiprocess environment
torch.cuda.set_device(idr_torch.local_rank)
gpu = torch.device("cuda")

# define list of transformations to apply
data_transform = torchvision.transforms.Compose([torchvision.transforms.Resize((300,300)),
                                                 torchvision.transforms.ToTensor()])

# load mnist dataset and apply transformations
root = os.environ['DSDIR']
start_time = time.time()
mnist_dataset = torchvision.datasets.MNIST(root=root,
                                           transform=data_transform,
                                           download=False)
end_time = time.time()
if idr_torch.rank == 0:
    print(f'Rank {idr_torch.rank}: Loading dataset took {end_time - start_time} s')

# define distributed sampler
data_sampler = torch.utils.data.distributed.DistributedSampler(mnist_dataset,
                                                               shuffle=True,
                                                               num_replicas=idr_torch.size,
                                                               rank=idr_torch.rank)

# define DataLoader - optimized parameters
batch_size = 128                       # adjust batch size according to GPU type (16GB or 32GB in memory)
drop_last = True                       # set to False if it represents important information loss
num_workers = idr_torch.cpus_per_task  # define number of CPU workers per process
persistent_workers = True              # set to False if CPU RAM must be released
pin_memory = True                      # optimize CPU to GPU transfers
non_blocking = True                    # activate asynchronism to speed up CPU/GPU transfers
prefetch_factor = 2                    # adjust number of batches to preload

if idr_torch.rank == 0:
    print(f'------')
    print(f'Config: batch_size={batch_size}, drop_last={drop_last}') 
    print(f'        num_workers={num_workers}, persistent_workers={persistent_workers},')
    print(f'        pin_memory={pin_memory}, non_blocking={non_blocking}, prefetch_factor={prefetch_factor}')
    print(f'------')

dataloader = torch.utils.data.DataLoader(mnist_dataset,
                                         sampler=data_sampler,
                                         batch_size=batch_size,
                                         drop_last=drop_last,
                                         num_workers=num_workers,
                                         persistent_workers=persistent_workers,
                                         pin_memory=pin_memory,
                                         prefetch_factor=prefetch_factor
                                        )

# loop over batches
transfer_time=[]
len_dataloader=len(dataloader)
for i, (images, labels) in enumerate(dataloader):
    
    start_time = time.time()
    images = images.to(gpu, non_blocking=non_blocking)
    labels = labels.to(gpu, non_blocking=non_blocking)
    end_time = time.time()
    transfer_time.append(end_time - start_time)

mean_transfer_time = sum(transfer_time) / len(transfer_time)
max_transfer_time = max(transfer_time)
max_idx = transfer_time.index(max(transfer_time))
if idr_torch.rank == 0:
    print(f'Rank {idr_torch.rank}: Loop ended')
    print(f'Rank {idr_torch.rank}: Mean transfer time = {int(mean_transfer_time*10**6)} μs (max = {int(max_transfer_time*10**6)} μs, reached at {max_idx}th transfer)')


global_end_time=time.time()
if idr_torch.rank == 0:
    print(f'Rank {idr_torch.rank}: Global time = {global_end_time - global_start_time} s\n')

Overwriting mnist_loader.py


### Création du script de soumission Slurm

**Rappel**:  si votre unique projet dispose d'heures CPU et GPU ou si votre login est rattaché à plusieurs projets, vous devez impérativement préciser l'attribution sur laquelle doit être décomptée les heures consommées par vos calculs, en ajoutant l'option `--account=my_project@gpu` comme indiqué dans la [documentation IDRIS](http://www.idris.fr/jean-zay/cpu/jean-zay-cpu-doc_account.html).

In [3]:
%%writefile job.slurm
#!/bin/bash
#SBATCH --job-name=data_loader_pytorch
##SBATCH --account=XXX@v100
#SBATCH --nodes=1
#SBATCH --ntasks=4
#SBATCH --gres=gpu:4
#SBATCH --cpus-per-task=10
#SBATCH --hint=nomultithread
#SBATCH --time=00:30:00
#SBATCH --output=data_loader_pytorch.out

module load pytorch-gpu/py3/1.7.1

srun python -u mnist_loader.py

Overwriting job.slurm


### Soumission et exécution de la version optimisée

In [4]:
import time
from IPython.display import clear_output
def display_slurm_queue():
    sq = !squeue -u $USER -n data_loader_pytorch
    while len(sq) >= 2:
        clear_output(wait=True)
        for l in sq: print(l)
        time.sleep(10)
        sq = !squeue -u $USER -n data_loader_pytorch
    print('\n Done!')

In [5]:
# submit job
!sbatch job.slurm

Submitted batch job 943004


In [6]:
display_slurm_queue() 


 Done!


In [7]:
# display output
!cat data_loader_pytorch.out

Loading pytorch-gpu/py3/1.7.1
  Loading requirement: gcc/8.5.0 cuda/10.2 nccl/2.7.8-1-cuda
    cudnn/8.0.4.30-cuda-10.2 intel-mkl/2020.1 magma/2.5.3-cuda
    openmpi/4.0.2-cuda
 --- Running on 4 GPU ---
Rank 0: Loading dataset took 0.19827032089233398 s
------
Config: batch_size=128, drop_last=True
        num_workers=10, persistent_workers=True,
        pin_memory=True, non_blocking=True, prefetch_factor=2
------
Rank 0: Loop ended
Rank 0: Mean transfer time = 383 μs (max = 10445 μs, reached at 20th transfer)
Rank 0: Global time = 5.433711290359497 s



## Tests des différentes optimisations <a class="anchor" id="tests"></a>

On souhaite ici observer l'impact des différents paramètres présentés dans la documentation IDRIS sur la performance du pré-traitement des données.
Les paramètres d'intérêt dans ces tests sont :
  * le nombre de GPU (distribution)
  * la taille de batch
  * le nombre de workers (multiprocessing)
  * l'épinglage de la mémoire et l'asynchronisme des transfert CPU/GPU
  * le pré-chargement des batches sur les GPU
  
Il est à noter que ces tests sont exécutés sur la base de données MNIST qui est de petite taille. Ce choix a été fait dans un but pédagogique, de manière à ce que l'exécution et la comparaison des performances se fassent rapidement. L'idée est ici de se convaincre du bénéfice de chaque optimisation. Le gain en performance sera potentiellement beaucoup plus avantageux sur une base de données plus grande.

----

* Création de dossiers pour stocker
    * les scripts de soumission Slurm
    * les scripts Python de chargement des données
    * les sorties standards des exécutions

In [8]:
!mkdir slurm
!mkdir scripts
!mkdir logs

mkdir: cannot create directory ‘slurm’: File exists
mkdir: cannot create directory ‘scripts’: File exists
mkdir: cannot create directory ‘logs’: File exists


* Script préliminaire de création des scripts Python et Slurm avec des paramètres variables

In [9]:
def create_new_scripts(ngpus=1, batch_size=8, num_workers=1, pin_memory=False, non_blocking=False, prefetch_factor=1):
    
    slurm_fname=f'slurm/job_{ngpus}_{batch_size}_{num_workers}_{pin_memory}_{prefetch_factor}.slurm'
    script_fname=f'scripts/mnist_loader_{ngpus}_{batch_size}_{num_workers}_{pin_memory}_{prefetch_factor}.py'
    
    # create slurm submission script with new number of gpus
    ref_file = open("job.slurm","r")
    new_file = open(slurm_fname,"w")
    for line in ref_file:
        if line.strip().startswith('#SBATCH --ntasks='):
            line = f'#SBATCH --ntasks={ngpus}\n'        
            new_file.write(line)
        elif line.strip().startswith('#SBATCH --gres=gpu:'):
            line = f'#SBATCH --gres=gpu:{ngpus}\n'
            new_file.write(line)
        elif line.strip().startswith('#SBATCH --output='):
            line = f'#SBATCH --output=logs/data_loader_{ngpus}_{batch_size}_{num_workers}_{pin_memory}_{prefetch_factor}.out\n'
            new_file.write(line)
        elif line.strip().startswith('srun'):
            line = f'srun python -u ' + script_fname
            new_file.write(line)
        else:
            new_file.write(line)
            
    # create python script with new parameters
    ref_file = open("mnist_loader.py","r")
    new_file = open(script_fname,"w")
    for line in ref_file:
        if line.strip().startswith('batch_size = '):
            line = f'batch_size = {batch_size}\n'
            new_file.write(line)
        elif line.strip().startswith('num_workers = '):
            line = f'num_workers = {num_workers}\n'
            new_file.write(line)
        elif line.strip().startswith('pin_memory = '):
            line = f'pin_memory = {pin_memory}\n'
            new_file.write(line)
        elif line.strip().startswith('non_blocking = '):
            line = f'non_blocking = {non_blocking}\n'
            new_file.write(line)
        elif line.strip().startswith('prefetch_factor = '):
            line = f'prefetch_factor = {prefetch_factor}\n'
            new_file.write(line)
        else:
            new_file.write(line)
          
    return slurm_fname

### Résultats de référence -- version sous-optimisée

Le résultat de référence correspond à une version sous optimisée des paramètres :
* nombre de GPU = 1
* taille de batch = 8
* multiprocessing activé mais un seul worker
* mémoire non épinglée et transferts CPU/GPU synchrones
* pré-chargement d'un seul batch à la fois

In [10]:
# create and execute reference scripts
slurm_fname = create_new_scripts()
print(slurm_fname)
!sbatch $slurm_fname

slurm/job_1_8_1_False_1.slurm
Submitted batch job 943026


In [11]:
display_slurm_queue()


 Done!


In [12]:
!cat logs/data_loader_1_8_1_False_1.out

Loading pytorch-gpu/py3/1.7.1
  Loading requirement: gcc/8.5.0 cuda/10.2 nccl/2.7.8-1-cuda
    cudnn/8.0.4.30-cuda-10.2 intel-mkl/2020.1 magma/2.5.3-cuda
    openmpi/4.0.2-cuda
 --- Running on 1 GPU ---
Rank 0: Loading dataset took 0.18850302696228027 s
------
Config: batch_size=8, drop_last=True
        num_workers=1, persistent_workers=True,
        pin_memory=False, non_blocking=False, prefetch_factor=1
------
Rank 0: Loop ended
Rank 0: Mean transfer time = 1047 μs (max = 1370933 μs, reached at 0th transfer)
Rank 0: Global time = 61.70293164253235 s



### Nombres de GPU

* Estimation du gain de temps lorsque l'on augmente le nombre de GPU

In [13]:
# create and execute scripts with increasing number of gpus (ngpus = 1 already done in ref job)
for ngpus in [2, 3, 4]:
    slurm_fname = create_new_scripts(ngpus=ngpus)
    !sbatch $slurm_fname

Submitted batch job 943046
Submitted batch job 943054
Submitted batch job 943055


In [14]:
display_slurm_queue()


 Done!


In [15]:
%%bash
for i in 1 2 3 4
do
    echo ">>> Ngpus = $i" 
    grep "Global time" logs/data_loader_${i}_8_1_False_1.out
done

>>> Ngpus = 1
Rank 0: Global time = 61.70293164253235 s
>>> Ngpus = 2
Rank 0: Global time = 31.87321448326111 s
>>> Ngpus = 3
Rank 0: Global time = 21.444392442703247 s
>>> Ngpus = 4
Rank 0: Global time = 17.067098379135132 s


### Taille de batch

* Estimation du gain de temps lorsque l'on augmente la taille de batch

In [16]:
# create and execute scripts with increasing batch size (batch_size=8 already done in ref job)
for batch_size in [16, 32, 64, 128]:
    slurm_fname = create_new_scripts(batch_size=batch_size)
    !sbatch $slurm_fname

Submitted batch job 943076
Submitted batch job 943079
Submitted batch job 943081
Submitted batch job 943087


In [17]:
display_slurm_queue()


 Done!


In [18]:
%%bash
for size in 8 16 32 64 128
do
    echo ">>> batch_size = $size" 
    grep "Global time" logs/data_loader_1_${size}_1_False_1.out
done

>>> batch_size = 8
Rank 0: Global time = 61.70293164253235 s
>>> batch_size = 16
Rank 0: Global time = 57.32425045967102 s
>>> batch_size = 32
Rank 0: Global time = 57.850083112716675 s
>>> batch_size = 64
Rank 0: Global time = 58.90222120285034 s
>>> batch_size = 128
Rank 0: Global time = 56.41951251029968 s


### Multiprocessing

* Estimation du gain de temps lorsque l'on augmente le nombre de workers

In [19]:
# create and execute scripts with increasing number of workers (num_workers=1 already done in ref job)
for num_workers in [2,4,6,8,10]:
    slurm_fname = create_new_scripts(num_workers=num_workers)
    !sbatch $slurm_fname

Submitted batch job 943116
Submitted batch job 943121
Submitted batch job 943124
Submitted batch job 943126
Submitted batch job 943130


In [20]:
display_slurm_queue()


 Done!


In [21]:
%%bash
for n in 1 2 4 6 8 10
do
    echo ">>> num_workers = $n" 
    grep "Global time" logs/data_loader_1_8_${n}_False_1.out
done

>>> num_workers = 1
Rank 0: Global time = 61.70293164253235 s
>>> num_workers = 2
Rank 0: Global time = 31.790295839309692 s
>>> num_workers = 4
Rank 0: Global time = 18.65254020690918 s
>>> num_workers = 6
Rank 0: Global time = 19.208420276641846 s
>>> num_workers = 8
Rank 0: Global time = 18.30638575553894 s
>>> num_workers = 10
Rank 0: Global time = 18.34398341178894 s


### Transferts CPU/GPU

* Estimation du gain de temps lorsque l'on épingle la mémoire et active l'asynchronisme

In [22]:
# create and execute scripts with optimized CPU/GPU transfers (pin_memory=False already done in ref job)
slurm_fname = create_new_scripts(pin_memory=True, non_blocking=True)
!sbatch $slurm_fname

Submitted batch job 943166


In [23]:
display_slurm_queue()


 Done!


In [24]:
%%bash
echo ">>> Pin memory = False "
grep "Mean transfer time" logs/data_loader_1_8_1_False_1.out
grep "Global time" logs/data_loader_1_8_1_False_1.out
echo ">>> Pin memory = True "
grep "Mean transfer time" logs/data_loader_1_8_1_True_1.out
grep "Global time" logs/data_loader_1_8_1_True_1.out

>>> Pin memory = False 
Rank 0: Mean transfer time = 1047 μs (max = 1370933 μs, reached at 0th transfer)
Rank 0: Global time = 61.70293164253235 s
>>> Pin memory = True 
Rank 0: Mean transfer time = 87 μs (max = 716 μs, reached at 0th transfer)
Rank 0: Global time = 70.56815028190613 s


### Pré-chargement des batches

* Estimation du gain de temps lorsque l'on augmente le nombre de batches pré-chargés

In [25]:
# create and execute scripts with increasing prefecth_factor (prefetch_factor=1 already done in ref job)
for prefetch_factor in [2,3,4]:
    slurm_fname = create_new_scripts(prefetch_factor=prefetch_factor)
    !sbatch $slurm_fname

Submitted batch job 943222
Submitted batch job 943225
Submitted batch job 943226


In [26]:
display_slurm_queue()


 Done!


In [27]:
%%bash
for factor in 1 2 3 4
do
    echo ">>> prefetch_factor = $factor" 
    grep "Global time" logs/data_loader_1_8_1_False_${factor}.out
done

>>> prefetch_factor = 1
Rank 0: Global time = 61.70293164253235 s
>>> prefetch_factor = 2
Rank 0: Global time = 52.38763475418091 s
>>> prefetch_factor = 3
Rank 0: Global time = 54.913636207580566 s
>>> prefetch_factor = 4
Rank 0: Global time = 55.82939147949219 s
