Parallélisme de données avec PyTorch
Cette page explique comment distribuer un modèle neuronal artificiel implémenté dans un code PyTorch, selon la méthode du parallélisme de données.
Nous documentons ici la solution intégrée DistributedDataParallel, qui est la plus performante selon la documentation PyTorch. Il s'agit d'un parallélisme multi-process qui fonctionne aussi bien en mono-nœud qu'en multi-nœuds.
Un exemple applicatif est proposé sous forme d'un Notebook en bas de page pour vous permettre d'accéder à une implémentation fonctionnelle des explications données ci-dessous.
Configuration multi-process avec Slurm
Pour le multi-nœuds, il est nécessaire d'utiliser le multi-processing géré par Slurm (exécution via la commande Slurm srun). Pour le
mono-nœud il est possible d'utiliser, comme la documentation PyTorch l'indique, torch.multiprocessing.spawn. Cependant il est possible et plus pratique d'utiliser le multi-processing Slurm dans tous les cas, en mono-nœud ou en multi-nœuds. C'est ce que nous documentons dans cette page.
Dans Slurm, lorsqu'on lance un script avec la commande srun, le script est automatiquement distribué sur toutes les tâches prédéfinies. Par exemple, si nous réservons 4 nœuds octo-GPU en demandant 3 GPU par nœud, nous obtenons :
- 4 nœuds, indexés de 0 à 3
- 3 GPU/nœud indexés de 0 à 2 sur chaque nœud
- 4 x 3 = 12 processus au total permettant d'exécuter 12 tâches avec les rangs de 0 à 11

Illustration d'une réservation Slurm de 4 nœuds et 3 GPU par nœud, soit 12 processus.
Les communications collectives inter-nœuds sont gérées par la librairie NCCL.
Voici deux exemples de script Slurm pour Jean-Zay :
- pour une réservation de N nœuds quadri-GPU V100 via la partition GPU par défaut :
#!/bin/bash
#SBATCH --job-name=torch-multi-gpu
#SBATCH --nodes=N # nombre total de noeuds (N à définir)
#SBATCH --ntasks-per-node=4 # nombre de taches par noeud (ici 4 taches soit 1 tache par GPU)
#SBATCH --gres=gpu:4 # nombre de GPU reserve par noeud (ici 4 soit tous les GPU)
#SBATCH --cpus-per-task=10 # nombre de coeurs par tache (donc 4x10 = 40 coeurs soit tous les coeurs)
#SBATCH --hint=nomultithread
#SBATCH --time=20:00:00
#SBATCH --output=torch-multi-gpu%j.out
##SBATCH --account=abc@v100
module load pytorch-gpu/py3/2.5.0
srun python myscript.py
- pour une réservation de N nœuds octo-GPU A100 :
#!/bin/bash
#SBATCH --job-name=torch-multi-gpu
#SBATCH --nodes=N # nombre total de noeuds (N à définir)
#SBATCH --ntasks-per-node=8 # nombre de taches par noeud (ici 8 taches soit 1 tache par GPU)
#SBATCH --gres=gpu:8 # nombre de GPU reserve par noeud (ici 8 soit tous les GPU)
#SBATCH --cpus-per-task=8 # nombre de coeurs par tache (donc 8x8 = 64 coeurs soit tous les coeurs)
#SBATCH --hint=nomultithread
#SBATCH --time=20:00:00
#SBATCH --output=torch-multi-gpu%j.out
#SBATCH -C a100
##SBATCH --account=abc@a100
module load arch/a100
module load pytorch-gpu/py3/2.5.0
srun python myscript.py
Dans ces deux exemples, les nœuds sont réservés en exclusivité. En particulier, cela nous donne accès à toute la mémoire de chaque nœud.
Implémentation de la solution DistributedDataParallel
Pour implémenter la solution DistributedDataParallel en PyTorch, il faut:
-
Définir les variables d'environnement liées au nœud maître pour configurer les communications inter-nœuds (nécessaire même en mono-nœud) :
MASTER_ADD, l'adresse IP ou le hostname du nœud correspondant à la tâche 0 (le premier de la liste des nœuds). Si on est en mono-nœud, la valeurlocalhostsuffit.MASTER_PORT, un numéro de port aléatoire. Pour éviter les conflits et par convention on utilisera un numéro de port compris entre10001et20000(par exemple12345).
idr_torchSur Jean Zay, une librairie
idr_torcha été développée par l'IDRIS pour définir automatiquement les variablesMASTER_ADDetMASTER_PORTau sein d'un job Slurm en cours. Elle est disponible dans tous nos environnements PyTorch. Pour l'utiliser, il suffit de l'importer dans votre script :import idr_torchUne fois
idr_torchimporté, vous avez également accès aux variables internes :idr_torch.rank(rang global du processus),idr_torch.local_rank(rang local du processus au sein de son nœud),idr_torch.size(nombre de processus),idr_torch.cpus_per_task(nombre de coeurs CPU alloués par processus).
infoidr_torchest disponible sur le GitHub de l'IDRIS. -
Initialiser l'environnement distribué (i.e. le communicateur) en appelant la fonction
init_process_group:import idr_torchimport torch.distributed as distdist.init_process_group(backend='nccl',init_method='env://',world_size=idr_torch.size,rank=idr_torch.rank)- Le backend indique la librairie qui sera utilisée pour les commmunications. Les backends possibles sont
NCCL,GLOOetMPI.NCCLest la librairie de communication implémentée par NVIDIA et est donc fortement recommandée sur Jean Zay ; init_method='env://'indique à PyTorch de rechercher les informations sur le nœud maître dans l'environnement (variables d'environnementMASTER_ADDRetMASTER_PORT, voir 1.) ;world_sizecorrespond au nombre de GPU alloués à l'exécution ;rankcorrespond au rang global du processus courant.
- Le backend indique la librairie qui sera utilisée pour les commmunications. Les backends possibles sont
-
Transférer le modèle en mémoire sur le GPU associé au processus courant, identifié par son rang local (
local_rank) :torch.cuda.set_device(idr_torch.local_rank)gpu = torch.device("cuda")model = model.to(gpu) -
Récupérer dans une nouvelle variable
ddp_modella version du modèle associée au processus courant :from torch.nn.parallel import DistributedDataParallel as DDP[...]ddp_model = DDP(model, device_ids=[idr_torch.local_rank]) -
Utiliser un sampler distribué pour répartir les batches en mini-batches sur les différents GPU à chaque itération. PyTorch fournit pour cela la classe
DistributedSamplerqui prend en argument le dataset d'entrée, le nombre de GPU disponibles, le rang global du processus courant et l'ordre de shuffling :train_sampler = torch.utils.data.distributed.DistributedSampler(train_dataset,num_replicas=idr_torch.size,rank=idr_torch.rank,shuffle=True)importantLa classe
DistributedSamplerprend en charge le shuffling en début de chaque epoch. Pour que la graine de génération aléatoire varie bien, il faut indiquer les changements d'epoch au sampler :for epoch in range(n_epochs):train_sampler.set_epoch(epoch) -
Adapter le
DataLoaderpour qu'il utilise le sampler distribué et une taille de batch correspondante au mini-batch :batch_size_per_gpu = global_batch_size // idr_torch.sizetrain_loader = torch.utils.data.DataLoader(dataset=train_dataset,batch_size=batch_size_per_gpu,shuffle=False, # shuffling délégué au samplersampler=train_sampler)remarqueL'étape de shuffling est déléguée au sampler distribué dans ce cas.
-
Envoyer les mini-batches et les labels correspondants en mémoire sur les GPU :
for (images, labels) in train_loader:images = images.to(gpu)labels = labels.to(gpu)
Sauvegarde et chargement de checkpoints
Il est possible de mettre en place des checkpoints lors d'un apprentissage distribué sur des GPU.
Sauvegarde
Le modèle est répliqué sur chaque GPU mais la sauvegarde de checkpoints peut être réalisée par un seul GPU pour limiter les opérations d'écriture. Par convention, on sollicite le GPU de rang 0 :
if idr_torch.rank == 0:
torch.save(ddp_model.state_dict(), CHECKPOINT_PATH)
Ainsi, le checkpoint contiendra l'information issue du GPU de rang 0 qui est alors sauvegardée dans un format spécifique aux modèles distribués.
Chargement
Au début d'un apprentissage, le chargement d'un checkpoint est d'abord opéré par le CPU, puis l'information est envoyée sur le GPU.
Par défaut et par convention, cet envoi se fait vers l'emplacement mémoire qui a été utilisé lors de l'étape de sauvegarde : avec notre exemple, seul le GPU 0 chargerait le modèle en mémoire.
Pour que l'information soit communiquée à l'ensemble des GPU, il faut utiliser l'argument map_location de la fonction de chargement
torch.load pour rediriger le stockage en mémoire.
Dans l'exemple ci-dessous, l'argument map_location ordonne une redirection du stockage en mémoire vers le GPU de rang local. Cette fonction étant appelée par l'ensemble des GPU, chaque GPU charge bien le checkpoint dans sa propre mémoire :
map_location = {'cuda:%d' % 0: 'cuda:%d' % idr_torch.local_rank} # remap storage from GPU 0 to local GPU
ddp_model.load_state_dict(torch.load(CHECKPOINT_PATH), map_location=map_location)) # load checkpoint
Si, comme dans le tutoriel PyTorch, un checkpoint est chargé juste après une sauvegarde, il est nécessaire d'appeler la méthode dist.barrier() avant le chargement. L'appel à dist.barrier() permet de synchroniser les GPU, garantissant ainsi que la sauvegarde du checkpoint par le GPU de rang 0 est bien achevée avant que les autres GPU tentent de le charger.
Validation distribuée
L'étape de validation exécutée après chaque epoch ou après un nombre fixé d'itérations d'apprentissage peut se distribuer sur tous les GPU engagés dans l'apprentissage du modèle. Lorsque le parallélisme de données est utilisé et que l'ensemble de données de validation est conséquent, cette solution de validation distribuée sur les GPU semble être la plus efficace et la plus rapide.
Ici, l'enjeu est de calculer les métriques (loss, accuracy, etc...) par batch et par GPU, puis de les pondérer et de les moyenner sur l'ensemble des données de validation.
Pour cela, il faut:
- charger les données de validation de la même manière que les données d'apprentissage, mais sans les transformations aléatoires comme la data augmentation ou le shuffling (voir la documentation sur le chargement de données pour apprentissage distribué en PyTorch) :
# validation dataset loading (imagenet for example)val_dataset = torchvision.datasets.ImageNet(root=root,split='val',transform=val_transform)# define distributed sampler for validationval_sampler = torch.utils.data.distributed.DistributedSampler(val_dataset,num_replicas=idr_torch.size,rank=idr_torch.rank,shuffle=False)# define dataloader for validationval_loader = torch.utils.data.DataLoader(dataset=val_dataset,batch_size=batch_size_per_gpu,shuffle=False,sampler=val_sampler,)
- basculer du mode "apprentissage" au mode "validation" pour désactiver certaines fonctionnalités propres à l'entraînement qui
sont coûteuses et ici inutiles :
model.eval()pour basculer le modèle en mode "validation" et désactiver la gestion des dropout, des batchnorm, etc.with torch.no_grad()pour ignorer le calcul du gradient- optionnellement,
with autocast()pour utiliser l'AMP (précision mixte)
- évaluer le modèle et calculer la métrique par batch de la manière habituelle (ici, nous prenons l'exemple du calcul de la
loss, cela sera la même chose pour d'autres métriques) :
outputs = model(val_images)suivi deloss = criterion(outputs, val_labels)
- pondérer et cumuler la métrique par GPU :
val_loss += loss * val_images.size(0) / Navecval_images.size(0)la taille du batch etNla taille globale du dataset de validation. Sachant que les batches n'ont pas nécessairement la même taille (dernier batch parfois plus petit), il est préférable d'utiliser ici la valeurval_images.size(0).
- sommer les moyennes pondérées de la métrique sur l'ensemble des GPU :
dist.all_reduce(val_loss, op=dist.ReduceOp.SUM)pour sommer les valeurs de la métrique calculées par GPU et communiquer le résultat à l'ensemble des GPU. Cette opération entraîne des communications inter-GPU.
Exemple après chargement des données de validation
model.eval() # switch into validation mode
val_loss = torch.Tensor([0.]).to(gpu) # initialize val_loss value
N = len(val_dataset) # get validation dataset length
for val_images, val_labels in val_loader: # loop over validation batches
val_images = val_images.to(gpu, non_blocking=True) # transfer images and labels to GPUs
val_labels = val_labels.to(gpu, non_blocking=True)
with torch.no_grad(): # deactivate gradient computation
with autocast(): # activate AMP
outputs = model(val_images) # evaluate model
loss = criterion(outputs, val_labels) # compute loss
val_loss += loss * val_images.size(0) / N # cumulate weighted mean per GPU
dist.all_reduce(val_loss, op=dist.ReduceOp.SUM) # sum weighted means and broadcast value to each GPU
model.train() # switch again into training mode
Exemple d'application
Exécution multi-GPU, multi-nœuds avec DistributedDataParallel
Un exemple se trouve dans $DSDIR/examples_IA/Torch_parallel/Example_DataParallelism_Pytorch.ipynb sur Jean-Zay, il utilise la base de données MNIST et un réseau dense simple. L'exemple est un Notebook qui permet de créer un script d'exécution. Il est à copier sur votre espace personnel (idéalement sur votre $WORK).
cp $DSDIR/examples_IA/Torch_parallel/Example_DataParallelism_PyTorch.ipynb $WORK
Vous pouvez ensuite exécuter le Notebook à partir d'une machine frontale de Jean Zay en sélectionnant un noyau PyTorch (voir notre documentation sur l'accès à JupyterHub pour en savoir plus sur l'usage des Notebooks sur Jean Zay).