# Data loading and pre-processing with PyTorch

## Implementation

*Notebook written by the IDRIS AI support team, March 2021*

 This document describes the method to use on Jean Zay to load and pre-process input data for a distributed training. It illustrates the [IDRIS documentation](http://www.idris.fr/eng/jean-zay/gpu/jean-zay-gpu-torch-data-preprocessing-eng.html)
 and uses the [PyTorch documentation] (https://pytorch.org/docs/stable/data.html) as reference.

This Notebook contains: 
 * A [complete example](#exemple) of optimised loading.
 * [Comparison tests](#tests) of performance gains offered by each functionality described in the documentation (distribution, multiprocessing, prefetching, etc.).

### Computing environment

This notebook can be executed on any Jean Zay node but we advise using the jupyterhub front-end node (i.e. an *interactive* connection) to avoid consuming your allocation. In this case, the hostname is `jean-zay-srv2`:

In [1]:
!hostname

jean-zay-srv2


You don't need to load any specific PyTorch module to run this notebook. The jobs will be submitted via Slurm in the `pytorch-gpu/py3/1.7.1` environment.

## Complete example of optimised loading 

### Creation of the data loading Python script - optimised version

In [2]:
%%writefile mnist_loader.py 
import os
import time
import torch
import torchvision
import idr_torch # IDRIS package available in all PyTorch modules - interface with Slurm

if idr_torch.rank == 0:
 print(f' --- Running on {idr_torch.size} GPU ---')

global_start_time=time.time()

# init multiprocess environment
torch.cuda.set_device(idr_torch.local_rank)
gpu = torch.device("cuda")

# define list of transformations to apply
data_transform = torchvision.transforms.Compose([torchvision.transforms.Resize((300,300)),
 torchvision.transforms.ToTensor()])

# load mnist dataset and apply transformations
root = os.environ['DSDIR']
start_time = time.time()
mnist_dataset = torchvision.datasets.MNIST(root=root,
 transform=data_transform,
 download=False)
end_time = time.time()
if idr_torch.rank == 0:
 print(f'Rank {idr_torch.rank}: Loading dataset took {end_time - start_time} s')

# define distributed sampler
data_sampler = torch.utils.data.distributed.DistributedSampler(mnist_dataset,
 shuffle=True,
 num_replicas=idr_torch.size,
 rank=idr_torch.rank)

# define DataLoader - optimised parameters
batch_size = 128 # adjust batch size according to GPU type (16GB or 32GB in memory)
drop_last = True # set to False if it represents important information loss
num_workers = idr_torch.cpus_per_task # define number of CPU workers per process
persistent_workers = True # set to False if CPU RAM must be released
pin_memory = True # optimise CPU to GPU transfers
non_blocking = True # activate asynchronism to speed up CPU/GPU transfers
prefetch_factor = 2 # adjust number of batches to preload

if idr_torch.rank == 0:
 print(f'------')
 print(f'Config: batch_size={batch_size}, drop_last={drop_last}') 
 print(f' num_workers={num_workers}, persistent_workers={persistent_workers},')
 print(f' pin_memory={pin_memory}, non_blocking={non_blocking}, prefetch_factor={prefetch_factor}')
 print(f'------')

dataloader = torch.utils.data.DataLoader(mnist_dataset,
 sampler=data_sampler,
 batch_size=batch_size,
 drop_last=drop_last,
 num_workers=num_workers,
 persistent_workers=persistent_workers,
 pin_memory=pin_memory,
 prefetch_factor=prefetch_factor
 )

# loop over batches
transfer_time=[]
len_dataloader=len(dataloader)
for i, (images, labels) in enumerate(dataloader):
 
 start_time = time.time()
 images = images.to(gpu, non_blocking=non_blocking)
 labels = labels.to(gpu, non_blocking=non_blocking)
 end_time = time.time()
 transfer_time.append(end_time - start_time)

mean_transfer_time = sum(transfer_time) / len(transfer_time)
max_transfer_time = max(transfer_time)
max_idx = transfer_time.index(max(transfer_time))
if idr_torch.rank == 0:
 print(f'Rank {idr_torch.rank}: Loop ended')
 print(f'Rank {idr_torch.rank}: Mean transfer time = {int(mean_transfer_time*10**6)} μs (max = {int(max_transfer_time*10**6)} μs, reached at {max_idx}th transfer)')


global_end_time=time.time()
if idr_torch.rank == 0:
 print(f'Rank {idr_torch.rank}: Global time = {global_end_time - global_start_time} s\n')

Overwriting mnist_loader.py


### Creation of the Slurm submission script

**Reminder**: If your single project has both CPU and GPU hours, or if your login is attached to more than one project, you must specify for which allocation the consumed hours should be counted by adding the option `--account=my_project@gpu` as explained in the [IDRIS documentation](http://www.idris.fr/eng/jean-zay/cpu/jean-zay-cpu-doc_account-eng.html).

In [3]:
%%writefile job.slurm
#!/bin/bash
#SBATCH --job-name=data_loader_pytorch-eng
##SBATCH --account=XXX@v100
#SBATCH --nodes=1
#SBATCH --ntasks=4
#SBATCH --gres=gpu:4
#SBATCH --cpus-per-task=10
#SBATCH --hint=nomultithread
#SBATCH --time=00:30:00
#SBATCH --output=data_loader_pytorch.out

module load pytorch-gpu/py3/1.7.1

srun python -u mnist_loader.py

Overwriting job.slurm


### Submission and execution of the optimised version

In [4]:
import time
from IPython.display import clear_output
def display_slurm_queue():
 sq = !squeue -u $USER -n data_loader_pytorch-eng
 while len(sq) >= 2:
 clear_output(wait=True)
 for l in sq: print(l)
 time.sleep(10)
 sq = !squeue -u $USER -n data_loader_pytorch-eng
 print('\n Done!')

In [5]:
# submit job
!sbatch job.slurm

Submitted batch job 943007


In [6]:
display_slurm_queue()


 Done!


In [7]:
# display output
!cat data_loader_pytorch.out

Loading pytorch-gpu/py3/1.7.1
 Loading requirement: gcc/8.5.0 cuda/10.2 nccl/2.7.8-1-cuda
 cudnn/8.0.4.30-cuda-10.2 intel-mkl/2020.1 magma/2.5.3-cuda
 openmpi/4.0.2-cuda
 --- Running on 4 GPU ---
Rank 0: Loading dataset took 0.11383652687072754 s
------
Config: batch_size=128, drop_last=True
 num_workers=10, persistent_workers=True,
 pin_memory=True, non_blocking=True, prefetch_factor=2
------
Rank 0: Loop ended
Rank 0: Mean transfer time = 188 μs (max = 1305 μs, reached at 59th transfer)
Rank 0: Global time = 5.309845685958862 s



## Tests of the different optimisations 

We wish here to observe the impact of different parameters presented in the IDRIS documentation on the performance of data pre-processing. 
The parameters of interest in these tests are: 
* Number of GPUs (distribution) 
* Batch size 
* Number of workers (multiprocessing)
* Memory pinning and asynchronism of CPU/GPU transfers
* Pre-fetching batches for the GPUs

It should be noted that these tests are run on the MNIST database which is small in size. This choice was made for educational purposes so that the execution and comparisons of performance can be done rapidly. The idea here is to be convinced of the benefits of each optimisation. The performance gain could potentialy be much greater on a larger database. 

-----

 * Creation of files to store:
 * Slurm submission scripts
 * Python scripts for data loading
 * Standard outputs of executions 

In [8]:
!mkdir slurm
!mkdir scripts
!mkdir logs

mkdir: cannot create directory ‘slurm’: File exists
mkdir: cannot create directory ‘scripts’: File exists
mkdir: cannot create directory ‘logs’: File exists


* Preliminary creation of Python and Slurm scripts with variable parameters

In [9]:
def create_new_scripts(ngpus=1, batch_size=8, num_workers=1, pin_memory=False, non_blocking=False, prefetch_factor=1):
 
 slurm_fname=f'slurm/job_{ngpus}_{batch_size}_{num_workers}_{pin_memory}_{prefetch_factor}.slurm'
 script_fname=f'scripts/mnist_loader_{ngpus}_{batch_size}_{num_workers}_{pin_memory}_{prefetch_factor}.py'
 
 # create slurm submission script with new number of gpus
 ref_file = open("job.slurm","r")
 new_file = open(slurm_fname,"w")
 for line in ref_file:
 if line.strip().startswith('#SBATCH --ntasks='):
 line = f'#SBATCH --ntasks={ngpus}\n' 
 new_file.write(line)
 elif line.strip().startswith('#SBATCH --gres=gpu:'):
 line = f'#SBATCH --gres=gpu:{ngpus}\n'
 new_file.write(line)
 elif line.strip().startswith('#SBATCH --output='):
 line = f'#SBATCH --output=logs/data_loader_{ngpus}_{batch_size}_{num_workers}_{pin_memory}_{prefetch_factor}.out\n'
 new_file.write(line)
 elif line.strip().startswith('srun'):
 line = f'srun python -u ' + script_fname
 new_file.write(line)
 else:
 new_file.write(line)
 
 # create python script with new parameters
 ref_file = open("mnist_loader.py","r")
 new_file = open(script_fname,"w")
 for line in ref_file:
 if line.strip().startswith('batch_size = '):
 line = f'batch_size = {batch_size}\n'
 new_file.write(line)
 elif line.strip().startswith('num_workers = '):
 line = f'num_workers = {num_workers}\n'
 new_file.write(line)
 elif line.strip().startswith('pin_memory = '):
 line = f'pin_memory = {pin_memory}\n'
 new_file.write(line)
 elif line.strip().startswith('non_blocking = '):
 line = f'non_blocking = {non_blocking}\n'
 new_file.write(line)
 elif line.strip().startswith('prefetch_factor = '):
 line = f'prefetch_factor = {prefetch_factor}\n'
 new_file.write(line)
 else:
 new_file.write(line)
 
 return slurm_fname

### Reference results of an under-optimised version

The reference results correspond to an under-optimised version of the following parameters:
* Number of GPUs = 1 
* Batch size = 8 
* Multiprocessing activated but only one worker 
* Non-pinned memory and synchronous CPU/GPU transfers
* Pre-fetching of only one batch at a time

In [10]:
# create and execute reference scripts
slurm_fname = create_new_scripts()
print(slurm_fname)
!sbatch $slurm_fname

slurm/job_1_8_1_False_1.slurm
Submitted batch job 943023


In [11]:
display_slurm_queue()


 Done!


In [12]:
!cat logs/data_loader_1_8_1_False_1.out

Loading pytorch-gpu/py3/1.7.1
 Loading requirement: gcc/8.5.0 cuda/10.2 nccl/2.7.8-1-cuda
 cudnn/8.0.4.30-cuda-10.2 intel-mkl/2020.1 magma/2.5.3-cuda
 openmpi/4.0.2-cuda
 --- Running on 1 GPU ---
Rank 0: Loading dataset took 0.050215721130371094 s
------
Config: batch_size=8, drop_last=True
 num_workers=1, persistent_workers=True,
 pin_memory=False, non_blocking=False, prefetch_factor=1
------
Rank 0: Loop ended
Rank 0: Mean transfer time = 1066 μs (max = 1399016 μs, reached at 0th transfer)
Rank 0: Global time = 64.90769052505493 s



### Number of GPUs

* Estimate of time gain when the number of GPUs is increased

In [13]:
# create and execute scripts with increasing number of gpus (ngpus = 1 already done in ref job)
for ngpus in [2, 3, 4]:
 slurm_fname = create_new_scripts(ngpus=ngpus)
 !sbatch $slurm_fname

Submitted batch job 943044
Submitted batch job 943045
Submitted batch job 943053


In [14]:
display_slurm_queue()


 Done!


In [15]:
%%bash
for i in 1 2 3 4
do
 echo ">>> Ngpus = $i" 
 grep "Global time" logs/data_loader_${i}_8_1_False_1.out
done

>>> Ngpus = 1
Rank 0: Global time = 64.90769052505493 s
>>> Ngpus = 2
Rank 0: Global time = 32.32062292098999 s
>>> Ngpus = 3
Rank 0: Global time = 22.028196811676025 s
>>> Ngpus = 4
Rank 0: Global time = 17.05109715461731 s


### Batch size

* Estimate of time gain when the batch size is increased

In [16]:
# create and execute scripts with increasing batch size (batch_size=8 already done in ref job)
for batch_size in [16, 32, 64, 128]:
 slurm_fname = create_new_scripts(batch_size=batch_size)
 !sbatch $slurm_fname

Submitted batch job 943066
Submitted batch job 943067
Submitted batch job 943068
Submitted batch job 943070


In [17]:
display_slurm_queue()


 Done!


In [18]:
%%bash
for size in 8 16 32 64 128
do
 echo ">>> batch_size = $size" 
 grep "Global time" logs/data_loader_1_${size}_1_False_1.out
done

>>> batch_size = 8
Rank 0: Global time = 64.90769052505493 s
>>> batch_size = 16
Rank 0: Global time = 56.986307859420776 s
>>> batch_size = 32
Rank 0: Global time = 56.82447123527527 s
>>> batch_size = 64
Rank 0: Global time = 58.25776219367981 s
>>> batch_size = 128
Rank 0: Global time = 57.39804220199585 s


### Multiprocessing

* Estimate of the time gain when the number of workers is increased

In [19]:
# create and execute scripts with increasing number of workers (num_workers=1 already done in ref job)
for num_workers in [2,4,6,8,10]:
 slurm_fname = create_new_scripts(num_workers=num_workers)
 !sbatch $slurm_fname

Submitted batch job 943112
Submitted batch job 943113
Submitted batch job 943114
Submitted batch job 943115
Submitted batch job 943117


In [20]:
display_slurm_queue()


 Done!


In [21]:
%%bash
for n in 1 2 4 6 8 10
do
 echo ">>> num_workers = $n" 
 grep "Global time" logs/data_loader_1_8_${n}_False_1.out
done

>>> num_workers = 1
Rank 0: Global time = 64.90769052505493 s
>>> num_workers = 2
Rank 0: Global time = 32.45805883407593 s
>>> num_workers = 4
Rank 0: Global time = 17.409164667129517 s
>>> num_workers = 6
Rank 0: Global time = 17.93054223060608 s
>>> num_workers = 8
Rank 0: Global time = 18.30246901512146 s
>>> num_workers = 10
Rank 0: Global time = 19.45599937438965 s


### CPU/GPU transfers

* Estimate of the time gain when the memory is pinned and asynchronism is activated

In [22]:
# create and execute scripts with optimised CPU/GPU transfers (pin_memory=False already done in ref job)
slurm_fname = create_new_scripts(pin_memory=True, non_blocking=True)
!sbatch $slurm_fname

Submitted batch job 943139


In [23]:
display_slurm_queue()


 Done!


In [24]:
%%bash
echo ">>> Pin memory = False "
grep "Mean transfer time" logs/data_loader_1_8_1_False_1.out
grep "Global time" logs/data_loader_1_8_1_False_1.out
echo ">>> Pin memory = True "
grep "Mean transfer time" logs/data_loader_1_8_1_True_1.out
grep "Global time" logs/data_loader_1_8_1_True_1.out

>>> Pin memory = False 
Rank 0: Mean transfer time = 1066 μs (max = 1399016 μs, reached at 0th transfer)
Rank 0: Global time = 64.90769052505493 s
>>> Pin memory = True 
Rank 0: Mean transfer time = 84 μs (max = 761 μs, reached at 0th transfer)
Rank 0: Global time = 68.2191755771637 s


### Pre-fetching batches

* Estimate of the time gain when the number of pre-fetched batches is increased

In [25]:
# create and execute scripts with increasing prefecth_factor (prefetch_factor=1 already done in ref job)
for prefetch_factor in [2,3,4]:
 slurm_fname = create_new_scripts(prefetch_factor=prefetch_factor)
 !sbatch $slurm_fname

Submitted batch job 943199
Submitted batch job 943200
Submitted batch job 943201


In [26]:
display_slurm_queue()


 Done!


In [27]:
%%bash
for factor in 1 2 3 4
do
 echo ">>> prefetch_factor = $factor" 
 grep "Global time" logs/data_loader_1_8_1_False_${factor}.out
done

>>> prefetch_factor = 1
Rank 0: Global time = 64.90769052505493 s
>>> prefetch_factor = 2
Rank 0: Global time = 52.71680998802185 s
>>> prefetch_factor = 3
Rank 0: Global time = 52.24641942977905 s
>>> prefetch_factor = 4
Rank 0: Global time = 52.70093059539795 s
