# PyTorch: Multi-GPU and multi-node Data Parallelism
## Implementation

*Notebook written by the IDRIS AI support team, November 2020*

This document presents the method to use on Jean Zay to distribute your PyTorch training depending on the **Data Parallelism** method. Pytorch documentation is used as [reference](https://pytorch.org/tutorials/intermediate/ddp_tutorial.html) and illustrates the [IDRIS documentation](http://www.idris.fr/eng/jean-zay/gpu/jean-zay-gpu-torch-multi-eng.html).

In this example, we are training a convolutional neural network on the MNIST database. 
The learning executes on several Jean Zay GPUs and compute nodes.

It consists here of: 
* Preparing the MNIST database 
* Writing the Python script for distributed learning (Data Parallelism) 
* Running a parallel execution on Jean Zay

Note that the MNIST data and the model used in this example are very simple.

This allows us to present a short code and to test the Data Parallelism configuration rapidly, but not to measure an acceleration of the training. In fact, the transfer time between GPUs, together with the initialization time of the GPU kernels, is sizeable in relation to the execution times.

------------------------

### Computing environment

This notebook is intended for execution from a Jean Zay front end. The hostname must be jean-zay[1-5].

In [2]:
!hostname

jean-zay2


A PyTorch module must be loaded beforehand in order for this Notebook to function correctly. For example, the ``pytorch-gpu/py3/1.7.0`` module:

In [3]:
!module list

[?1h=Currently Loaded Modulefiles:[m
 1) gcc/8.3.1 4) cudnn/8.0.4.30-cuda-10.2 7) [4mopenmpi/4.0.5-cuda[0m [m
 2) cuda/10.2 5) intel-mkl/2020.4 8) pytorch-gpu/py3/1.8.0 [m
 3) nccl/2.8.3-1-cuda 6) magma/2.5.4-cuda [m
[K[?1l>

Creation of a ``checkpoint`` file if it doesn't exist.

In [4]:
!mkdir checkpoint
!rm checkpoint/*

mkdir: cannot create directory ‘checkpoint’: File exists


------------------------------------

### Preparation of the MNIST database

The MNIST database is available in the DSDIR of Jean Zay.

**Comment**: The DSDIR, like the SCRATCH, is a GPFS disk space of which the bandwidth is about 300 GB/s in write and in read. These are the preferred spaces for codes having intense usage for input/output operations. Your personal SCRATCH space is dedicated to your private databases and the common DSDIR space includes most of the public databases. 

You can test the data access with the following command:

In [5]:
import os
import torchvision
import torchvision.transforms as transforms

torchvision.datasets.MNIST(root=os.environ['DSDIR'],
 train=True,
 transform=transforms.ToTensor(),
 download=False)

Dataset MNIST
 Number of datapoints: 60000
 Root location: /gpfsdswork/dataset
 Split: Train
 StandardTransform
Transform: ToTensor()

### Writing the Python script for distributed learning (Data Parallelism)

In this section, we write the Python training script in the
‘mnist-distributed.py’ file.

* Loading libraries and defining the main function:

In [6]:
%%writefile mnist-distributed.py 

import os
from datetime import datetime
from time import time
import argparse
import torch.multiprocessing as mp
import torchvision
import torchvision.transforms as transforms
import torch
import torch.nn as nn
import torch.distributed as dist
from torch.nn.parallel import DistributedDataParallel
import idr_torch

def main():
 parser = argparse.ArgumentParser()
 parser.add_argument('-b', '--batch-size', default=128, type =int,
 help='batch size. it will be divided in mini-batch for each worker')
 parser.add_argument('-e','--epochs', default=2, type=int, metavar='N',
 help='number of total epochs to run')
 parser.add_argument('-c','--checkpoint', default=None, type=str,
 help='path to checkpoint to load')
 args = parser.parse_args()

 train(args) 

Overwriting mnist-distributed.py


* Creation of the learning model (shallow convolutional neural network with 2 layers):

In [7]:
%%writefile -a mnist-distributed.py

class ConvNet(nn.Module):
 def __init__(self, num_classes=10):
 super(ConvNet, self).__init__()
 self.layer1 = nn.Sequential(
 nn.Conv2d(1, 16, kernel_size=5, stride=1, padding=2),
 nn.BatchNorm2d(16),
 nn.ReLU(),
 nn.MaxPool2d(kernel_size=2, stride=2))
 self.layer2 = nn.Sequential(
 nn.Conv2d(16, 32, kernel_size=5, stride=1, padding=2),
 nn.BatchNorm2d(32),
 nn.ReLU(),
 nn.MaxPool2d(kernel_size=2, stride=2))
 self.fc = nn.Linear(7*7*32, num_classes)

 def forward(self, x):
 out = self.layer1(x)
 out = self.layer2(out)
 out = out.reshape(out.size(0), -1)
 out = self.fc(out)
 return out


Appending to mnist-distributed.py


* Defining the distributed learning function (the timers and displays are managed by process 0, which is the master process)

In [8]:
%%writefile -a mnist-distributed.py

def train(args):
 
 # configure distribution method: define address and port of the master node and initialise communication backend (NCCL)
 dist.init_process_group(backend='nccl', init_method='env://', world_size=idr_torch.size, rank=idr_torch.rank)
 
 # distribute model
 torch.cuda.set_device(idr_torch.local_rank)
 gpu = torch.device("cuda")
 model = ConvNet().to(gpu)
 ddp_model = DistributedDataParallel(model, device_ids=[idr_torch.local_rank])
 if args.checkpoint is not None:
 map_location = {'cuda:%d' % 0: 'cuda:%d' % idr_torch.local_rank}
 ddp_model.load_state_dict(torch.load(args.checkpoint, map_location=map_location))
 
 # distribute batch size (mini-batch)
 batch_size = args.batch_size 
 batch_size_per_gpu = batch_size // idr_torch.size
 
 # define loss function (criterion) and optimizer
 criterion = nn.CrossEntropyLoss() 
 optimizer = torch.optim.SGD(ddp_model.parameters(), 1e-4)

 # load data with distributed sampler
 train_dataset = torchvision.datasets.MNIST(root=os.environ['DSDIR'],
 train=True,
 transform=transforms.ToTensor(),
 download=False)
 
 train_sampler = torch.utils.data.distributed.DistributedSampler(train_dataset,
 num_replicas=idr_torch.size,
 rank=idr_torch.rank)
 
 train_loader = torch.utils.data.DataLoader(dataset=train_dataset,
 batch_size=batch_size_per_gpu,
 shuffle=False,
 num_workers=0,
 pin_memory=True,
 sampler=train_sampler)

 # training (timers and display handled by process 0)
 if idr_torch.rank == 0: start = datetime.now() 
 total_step = len(train_loader)
 
 for epoch in range(args.epochs):
 if idr_torch.rank == 0: start_dataload = time()
 
 for i, (images, labels) in enumerate(train_loader):
 
 # distribution of images and labels to all GPUs
 images = images.to(gpu, non_blocking=True)
 labels = labels.to(gpu, non_blocking=True) 
 
 if idr_torch.rank == 0: stop_dataload = time()

 if idr_torch.rank == 0: start_training = time()
 
 # forward pass
 outputs = ddp_model(images)
 loss = criterion(outputs, labels)

 # backward and optimize
 optimizer.zero_grad()
 loss.backward()
 optimizer.step()
 
 if idr_torch.rank == 0: stop_training = time() 
 if (i + 1) % 200 == 0 and idr_torch.rank == 0:
 print('Epoch [{}/{}], Step [{}/{}], Loss: {:.4f}, Time data load: {:.3f}ms, Time training: {:.3f}ms'.format(epoch + 1, args.epochs,
 i + 1, total_step, loss.item(), (stop_dataload - start_dataload)*1000,
 (stop_training - start_training)*1000))
 if idr_torch.rank == 0: start_dataload = time()
 
 #Save checkpoint at every end of epoch
 if idr_torch.rank == 0:
 torch.save(ddp_model.state_dict(), './checkpoint/{}GPU_{}epoch.checkpoint'.format(idr_torch.size, epoch+1))

 if idr_torch.rank == 0:
 print(">>> Training complete in: " + str(datetime.now() - start))


Appending to mnist-distributed.py


* Defining the principal function:

In [9]:
%%writefile -a mnist-distributed.py

if __name__ == '__main__':
 
 # get distributed configuration from Slurm environment
 NODE_ID = os.environ['SLURM_NODEID']
 MASTER_ADDR = os.environ['MASTER_ADDR']
 
 # display info
 if idr_torch.rank == 0:
 print(">>> Training on ", len(idr_torch.hostnames), " nodes and ", idr_torch.size, " processes, master node is ", MASTER_ADDR)
 print("- Process {} corresponds to GPU {} of node {}".format(idr_torch.rank, idr_torch.local_rank, NODE_ID))

 main()

Appending to mnist-distributed.py


### Example of mono-GPU mono-node execution

* Writing the submission batch script

**Remember**: If your single project has both CPU and GPU hours, or if your login is attached to more than one project, you must specify for which allocation the consumed hours should be counted by adding the option ``--account=my_project@gpu`` as explained in the [IDRIS documentation](http://www.idris.fr/eng/jean-zay/cpu/jean-zay-cpu-doc_account-eng.html).

In [10]:
%%writefile batch_monogpu.slurm
#!/bin/sh
#SBATCH --job-name=mnist_pytorch_monogpu
#SBATCH --output=mnist_pytorch_monogpu.out
#SBATCH --error=mnist_pytorch_monogpu.out
#SBATCH --nodes=1
#SBATCH --ntasks=1
#SBATCH --gres=gpu:1
#SBATCH --cpus-per-task=10
#SBATCH --hint=nomultithread
#SBATCH --time=00:10:00
#SBATCH --qos=qos_gpu-dev

# go into the submission directory 
cd ${SLURM_SUBMIT_DIR}

# cleans out modules loaded in interactive and inherited by default
module purge

# loading modules
module load pytorch-gpu/py3/1.7.0

# echo of launched commands
set -x

# code execution
srun python -u mnist-distributed.py --epochs 8 --batch-size 128

Overwriting batch_monogpu.slurm


* Submission of the batch script and display of the output

In [11]:
%%bash
# submit job
sbatch batch_monogpu.slurm

Submitted batch job 210595


In [12]:
# watch Slurm queue line until the job is done
# execution should take about 1 minute
import time
sq = !squeue -u $USER -n mnist_pytorch_monogpu
print(sq[0])
while len(sq) >= 2:
 print(sq[1],end='\r')
 time.sleep(5)
 sq = !squeue -u $USER -n mnist_pytorch_monogpu
print('\n Done!')

 JOBID PARTITION NAME USER ST TIME NODES NODELIST(REASON) 
 210595 gpu_p13 mnist_py ssos040 R 0:51 1 r10i2n5 
 Done!


In [13]:
# display output
%cat mnist_pytorch_monogpu.out

Loading pytorch-gpu/py3/1.7.0
 Loading requirement: gcc/8.3.1 cuda/10.2 nccl/2.6.4-1-cuda
 cudnn/7.6.5.32-cuda-10.2 intel-mkl/2020.1 magma/2.5.3-cuda
 openmpi/4.0.2-cuda
+ srun python -u mnist-distributed.py --epochs 8 --batch-size 128
>>> Training on 1 nodes and 1 processes, master node is r10i2n5
- Process 0 corresponds to GPU 0 of node 0
Epoch [1/8], Step [200/469], Loss: 1.9836, Time data load: 9.957ms, Time training: 2.359ms
Epoch [1/8], Step [400/469], Loss: 1.8060, Time data load: 10.119ms, Time training: 2.400ms
Epoch [2/8], Step [200/469], Loss: 1.4512, Time data load: 11.848ms, Time training: 4.598ms
Epoch [2/8], Step [400/469], Loss: 1.3760, Time data load: 9.945ms, Time training: 2.332ms
Epoch [3/8], Step [200/469], Loss: 1.1132, Time data load: 9.964ms, Time training: 2.336ms
Epoch [3/8], Step [400/469], Loss: 1.1073, Time data load: 10.023ms, Time training: 2.385ms
Epoch [4/8], Step [200/469], Loss: 0.8998, Time data load: 10.277ms, Time training: 2.374ms
Ep

### Example of multi-GPU mono-node execution

* Writing the submission batch script

**Remember**: If your single project has both CPU and GPU hours, or if your login is attached to more than one project, you must specify for which allocation the consumed hours should be counted by adding the option ``--account=my_project@gpu`` as explained in the [IDRIS documentation](http://www.idris.fr/eng/jean-zay/cpu/jean-zay-cpu-doc_account-eng.html).

In [14]:
%%writefile batch_mononode.slurm
#!/bin/sh
#SBATCH --job-name=mnist_pytorch_mononode
#SBATCH --output=mnist_pytorch_mononode.out
#SBATCH --error=mnist_pytorch_mononode.out
#SBATCH --nodes=1
#SBATCH --ntasks=4
#SBATCH --gres=gpu:4
#SBATCH --cpus-per-task=10
#SBATCH --hint=nomultithread
#SBATCH --time=00:10:00
#SBATCH --qos=qos_gpu-dev

# go into the submission directory 
cd ${SLURM_SUBMIT_DIR}

# cleans out modules loaded in interactive and inherited by default
module purge

# loading modules
module load pytorch-gpu/py3/1.7.0

# echo of launched commands
set -x

# code execution
srun python -u mnist-distributed.py --epochs 8 --batch-size 128

Overwriting batch_mononode.slurm


* Submission of the batch script and display of the output

In [15]:
%%bash
# submit job
sbatch batch_mononode.slurm

Submitted batch job 210599


In [16]:
# watch Slurm queue line until the job is done
# execution should take less than 1 minute
import time
sq = !squeue -u $USER -n mnist_pytorch_mononode
print(sq[0])
while len(sq) >= 2:
 print(sq[1],end='\r')
 time.sleep(5)
 sq = !squeue -u $USER -n mnist_pytorch_mononode
print('\n Done!')

 JOBID PARTITION NAME USER ST TIME NODES NODELIST(REASON) 
 210599 gpu_p13 mnist_py ssos040 R 0:23 1 r10i7n0 
 Done!


In [17]:
#display output 
%cat mnist_pytorch_mononode.out

Loading pytorch-gpu/py3/1.7.0
 Loading requirement: gcc/8.3.1 cuda/10.2 nccl/2.6.4-1-cuda
 cudnn/7.6.5.32-cuda-10.2 intel-mkl/2020.1 magma/2.5.3-cuda
 openmpi/4.0.2-cuda
+ srun python -u mnist-distributed.py --epochs 8 --batch-size 128
- Process 3 corresponds to GPU 3 of node 0
>>> Training on 1 nodes and 4 processes, master node is r10i7n0
- Process 0 corresponds to GPU 0 of node 0
- Process 1 corresponds to GPU 1 of node 0
- Process 2 corresponds to GPU 2 of node 0
Epoch [1/8], Step [200/469], Loss: 1.9805, Time data load: 2.751ms, Time training: 2.274ms
Epoch [1/8], Step [400/469], Loss: 1.7145, Time data load: 2.744ms, Time training: 2.271ms
Epoch [2/8], Step [200/469], Loss: 1.4873, Time data load: 2.740ms, Time training: 2.270ms
Epoch [2/8], Step [400/469], Loss: 1.2656, Time data load: 2.743ms, Time training: 2.273ms
Epoch [3/8], Step [200/469], Loss: 1.1683, Time data load: 2.735ms, Time training: 2.314ms
Epoch [3/8], Step [400/469], Loss: 1.0061, Time data load:

### Example of multi-GPU multi-node execution

* Writing the submission batch script

**Remember**: If your single project has both CPU and GPU hours, or if your login is attached to more than one project, you must specify for which allocation the consumed hours should be counted by adding the option ``--account=my_project@gpu`` as explained in the [IDRIS documentation](http://www.idris.fr/eng/jean-zay/cpu/jean-zay-cpu-doc_account-eng.html).


In [18]:
%%writefile batch_multinode.slurm
#!/bin/sh
#SBATCH --job-name=mnist_pytorch_multinode
#SBATCH --output=mnist_pytorch_multinode.out
#SBATCH --error=mnist_pytorch_multinode.out
#SBATCH --nodes=3
#SBATCH --gres=gpu:4
#SBATCH --ntasks-per-node=4
#SBATCH --cpus-per-task=10
#SBATCH --hint=nomultithread
#SBATCH --time=00:10:00
#SBATCH --qos=qos_gpu-dev

# go into the submission directory 
cd ${SLURM_SUBMIT_DIR}

# cleans out modules loaded in interactive and inherited by default
module purge

# loading modules
module load pytorch-gpu/py3/1.7.0

# echo of launched commands
set -x

# code execution
srun python -u mnist-distributed.py --epochs 8 --batch-size 128

Overwriting batch_multinode.slurm


* Submission of the batch script and display of the output

In [19]:
%%bash
# submit job
sbatch batch_multinode.slurm

Submitted batch job 210604


sbatch: IDRIS: setting exclusive mode for the job.


In [20]:
# watch Slurm queue line until the job is done
# execution should take about 1 minute
import time
sq = !squeue -u $USER -n mnist_pytorch_multinode
print(sq[0])
while len(sq) >= 2:
 print(sq[1],end='\r')
 time.sleep(5)
 sq = !squeue -u $USER -n mnist_pytorch_multinode
print('\n Done!')

 JOBID PARTITION NAME USER ST TIME NODES NODELIST(REASON) 
 210604 gpu_p13 mnist_py ssos040 R 0:26 3 r13i5n[7-8],r13i6n0 
 Done!


In [21]:
# display output
%cat mnist_pytorch_multinode.out

Loading pytorch-gpu/py3/1.7.0
 Loading requirement: gcc/8.3.1 cuda/10.2 nccl/2.6.4-1-cuda
 cudnn/7.6.5.32-cuda-10.2 intel-mkl/2020.1 magma/2.5.3-cuda
 openmpi/4.0.2-cuda
+ srun python -u mnist-distributed.py --epochs 8 --batch-size 128
>>> Training on 3 nodes and 12 processes, master node is r13i5n7
- Process 0 corresponds to GPU 0 of node 0
- Process 1 corresponds to GPU 1 of node 0
- Process 2 corresponds to GPU 2 of node 0
- Process 3 corresponds to GPU 3 of node 0
- Process 4 corresponds to GPU 0 of node 1
- Process 5 corresponds to GPU 1 of node 1
- Process 10 corresponds to GPU 2 of node 2
- Process 6 corresponds to GPU 2 of node 1
- Process 7 corresponds to GPU 3 of node 1
- Process 11 corresponds to GPU 3 of node 2
- Process 8 corresponds to GPU 0 of node 2
- Process 9 corresponds to GPU 1 of node 2
Epoch [1/8], Step [200/500], Loss: 2.1815, Time data load: 1.044ms, Time training: 2.526ms
Epoch [1/8], Step [400/500], Loss: 1.6933, Time data load: 1.042ms, Tim

### Example of multi-node execution from a checkpoint

* Writing the submission batch script

**Remember**: If your single project has both CPU and GPU hours, or if your login is attached to more than one project, you must specify for which allocation the consumed hours should be counted by adding the option ``--account=my_project@gpu`` as explained in the [IDRIS documentation](http://www.idris.fr/eng/jean-zay/cpu/jean-zay-cpu-doc_account-eng.html).

In [22]:
%%writefile batch_multinode.slurm
#!/bin/sh
#SBATCH --job-name=mnist_pytorch_multinode
#SBATCH --output=mnist_pytorch_multinode.out
#SBATCH --error=mnist_pytorch_multinode.out
#SBATCH --nodes=3
#SBATCH --gres=gpu:4
#SBATCH --ntasks-per-node=4
#SBATCH --cpus-per-task=10
#SBATCH --hint=nomultithread
#SBATCH --time=00:10:00
#SBATCH --qos=qos_gpu-dev

# go into the submission directory 
cd ${SLURM_SUBMIT_DIR}

# cleans out modules loaded in interactive and inherited by default
module purge

# loading modules
module load pytorch-gpu/py3/1.7.0

# echo of launched commands
set -x

# code execution
srun python -u mnist-distributed.py --epochs 8 --batch-size 128 -c ./checkpoint/12GPU_8epoch.checkpoint

Overwriting batch_multinode.slurm


In [23]:
%%bash
# submit job
sbatch batch_multinode.slurm

Submitted batch job 210608


sbatch: IDRIS: setting exclusive mode for the job.


In [24]:
# watch Slurm queue line until the job is done
# execution should take about 1 minute
import time
sq = !squeue -u $USER -n mnist_pytorch_multinode
print(sq[0])
while len(sq) >= 2:
 print(sq[1],end='\r')
 time.sleep(5)
 sq = !squeue -u $USER -n mnist_pytorch_multinode
print('\n Done!')

 JOBID PARTITION NAME USER ST TIME NODES NODELIST(REASON) 
 210608 gpu_p13 mnist_py ssos040 R 0:19 3 r13i5n[7-8],r13i6n0 
 Done!


In [25]:
# display output
%cat mnist_pytorch_multinode.out

Loading pytorch-gpu/py3/1.7.0
 Loading requirement: gcc/8.3.1 cuda/10.2 nccl/2.6.4-1-cuda
 cudnn/7.6.5.32-cuda-10.2 intel-mkl/2020.1 magma/2.5.3-cuda
 openmpi/4.0.2-cuda
+ srun python -u mnist-distributed.py --epochs 8 --batch-size 128 -c ./checkpoint/12GPU_8epoch.checkpoint
- Process 3 corresponds to GPU 3 of node 0
>>> Training on 3 nodes and 12 processes, master node is r13i5n7
- Process 0 corresponds to GPU 0 of node 0
- Process 1 corresponds to GPU 1 of node 0
- Process 2 corresponds to GPU 2 of node 0
- Process 4 corresponds to GPU 0 of node 1
- Process 10 corresponds to GPU 2 of node 2
- Process 7 corresponds to GPU 3 of node 1
- Process 5 corresponds to GPU 1 of node 1
- Process 6 corresponds to GPU 2 of node 1
- Process 11 corresponds to GPU 3 of node 2
- Process 8 corresponds to GPU 0 of node 2
- Process 9 corresponds to GPU 1 of node 2
Epoch [1/8], Step [200/500], Loss: 0.6191, Time data load: 1.076ms, Time training: 2.862ms
Epoch [1/8], Step [400/500], Lo