Parallélisme de données avec Horovod
Horovod est une brique logicielle permettant le parallélisme de données pour TensorFlow, Keras, PyTorch, et Apache MXNet. L'objectif d'Horovod est de rendre le code performant et facile à implémenter.
Dans les exemples de la communauté IA, Horovod est souvent utilisé avec Tensorflow pour faciliter l'implémentation du parallélisme de données.
Horovod s'appuie sur les librairies de communication MPI et NCCL pour échanger les données entre les processus de calcul.
Stack Horovod, également compatible avec la librairie PyTorch
et le réseau d'interconnexion OmniPath. Source.
Nous documentons ici les solutions Horovod sur Jean Zay en TensorFlow ou en PyTorch. Il s'agit d'un parallélisme multi-process qui fonctionne aussi bien en mono-nœud qu'en multi-nœuds.
Un exemple applicatif est proposé sous forme d'un Notebook en bas de page pour vous permettre d'accéder à une implémentation fonctionnelle des explications données ci-dessous.
Configuration multi-process avec Slurm
Un des avantages d'Horovod est qu'aucune configuration manuelle n'est nécessaire dans la description de l'environnement distribué. Horovod récupère directement les informations relatives aux GPU, aux machines disponibles ainsi qu'aux protocoles de communication, à partir de l'environnement machine. En particulier, la solution Horovod assure la portabilité de votre code.
Dans Slurm, lorsqu'on lance un script avec la commande srun, le script est automatiquement distribué sur toutes les tâches prédéfinies. Par exemple, si nous réservons 4 nœuds octo-GPU en demandant 3 GPU par nœud, nous obtenons :
- 4 nœuds, indexés de 0 à 3
- 3 GPU/nœud indexés de 0 à 2 sur chaque nœud
- 4 x 3 = 12 processus au total permettant d'exécuter 12 tâches avec les rangs de 0 à 11

Illustration d'une réservation Slurm de 4 nœuds et 3 GPU par nœud, soit 12 processus.
Les communications collectives inter-nœuds sont gérées par la librairie NCCL.
Pour exécuter un code distribué avec Horovod sous l'environnement Slurm, il faut réserver une tâche par GPU impliqué dans le parallélisme de données.
Voici deux exemples de script Slurm pour Jean-Zay :
- pour une réservation de N nœuds quadri-GPU V100 via la partition GPU par défaut :
#!/bin/bash
#SBATCH --job-name=torch-multi-gpu
#SBATCH --nodes=N # nombre total de noeuds (N à définir)
#SBATCH --ntasks-per-node=4 # nombre de taches par noeud (ici 4 taches soit 1 tache par GPU)
#SBATCH --gres=gpu:4 # nombre de GPU reserve par noeud (ici 4 soit tous les GPU)
#SBATCH --cpus-per-task=10 # nombre de coeurs par tache (donc 4x10 = 40 coeurs soit tous les coeurs)
#SBATCH --hint=nomultithread
#SBATCH --time=20:00:00
#SBATCH --output=torch-multi-gpu%j.out
##SBATCH --account=abc@v100
module load pytorch-gpu/py3/2.5.0
srun python myscript.py
- pour une réservation de N nœuds octo-GPU A100 :
#!/bin/bash
#SBATCH --job-name=torch-multi-gpu
#SBATCH --nodes=N # nombre total de noeuds (N à définir)
#SBATCH --ntasks-per-node=8 # nombre de taches par noeud (ici 8 taches soit 1 tache par GPU)
#SBATCH --gres=gpu:8 # nombre de GPU reserve par noeud (ici 8 soit tous les GPU)
#SBATCH --cpus-per-task=8 # nombre de coeurs par tache (donc 8x8 = 64 coeurs soit tous les coeurs)
#SBATCH --hint=nomultithread
#SBATCH --time=20:00:00
#SBATCH --output=torch-multi-gpu%j.out
#SBATCH -C a100
##SBATCH --account=abc@a100
module load arch/a100
module load pytorch-gpu/py3/2.5.0
srun python myscript.py
Dans ces deux exemples, les nœuds sont réservés en exclusivité. En particulier, cela nous donne accès à toute la mémoire de chaque nœud.
Implémentation de la solution Horovod
Il est possible d'utiliser Horovod dans les configurations logicielles suivantes :
- TensorFlow - Voir la documentation Horovod TensorFlow
- TensorFlow et Keras - Voir la documentation Horovod avec Keras
- PyTorch - Voir la documentation Horovod PyTorch
Nous vous renvoyons vers la documentation Horovod pour les détails d'implémentation relatifs à chaque configuration logicielle.
Dans tous les cas, le code diffère légèrement mais les étapes de développement restent identiques. Il faut :
- Importer et initialiser Horovod.
- Rattacher chaque GPU à un processus distinct.
- Augmenter le learning rate proportionnellement au nombre de GPU pour compenser l'augmentation de la taille du batch.
- Distribuer l'optimiseur.
- S'assurer que les variables sont correctement répliquées sur tous les GPU en début d'apprentissage (et après l'initialisation du modèle).
- Sauvegarder des checkpoints uniquement à partir du processus de rang 0.
Par exemple, la solution Horovod implémentée dans un code TensorFlow 2 + Keras est illustrée dans le script suivant :
import tensorflow as tf
import horovod.tensorflow.keras as hvd
# Initialize Horovod
hvd.init()
# Pin GPU to be used to process local rank (one GPU per process)
gpus = tf.config.experimental.list_physical_devices('GPU')
for gpu in gpus:
tf.config.experimental.set_memory_growth(gpu, True)
if gpus:
tf.config.experimental.set_visible_devices(gpus[hvd.local_rank()], 'GPU')
# Build model and dataset
dataset = ...
model = ...
opt = tf.optimizers.Adam(0.001 * hvd.size())
# Horovod: add Horovod DistributedOptimizer.
opt = hvd.DistributedOptimizer(opt)
# Horovod: Specify `experimental_run_tf_function=False` to ensure TensorFlow
# uses hvd.DistributedOptimizer() to compute gradients.
mnist_model.compile(loss=tf.losses.SparseCategoricalCrossentropy(),
optimizer=opt,
metrics=['accuracy'],
experimental_run_tf_function=False)
callbacks = [
# Horovod: broadcast initial variable states from rank 0 to all other processes.
# This is necessary to ensure consistent initialization of all workers when
# training is started with random weights or restored from a checkpoint.
hvd.callbacks.BroadcastGlobalVariablesCallback(0),
]
# Horovod: save checkpoints only on worker 0 to prevent other workers from corrupting them.
if hvd.rank() == 0:
callbacks.append(keras.callbacks.ModelCheckpoint('./checkpoint-{epoch}.h5'))
model.fit(dataset,
steps_per_epoch=500 // hvd.size(),
callbacks=callbacks,
epochs=24,
verbose=1 if hvd.rank() == 0 else 0)
Validation distribuée
L'étape de validation exécutée après chaque epoch ou après un nombre fixé d'itérations d'apprentissage peut se distribuer sur tous les GPU engagés dans l'apprentissage du modèle. Lorsque le parallélisme de données est utilisé et que l'ensemble de données de validation est conséquent, cette solution de validation distribuée sur les GPU semble être la plus efficace et la plus rapide.
Ici, l'enjeu est de calculer les métriques (loss, accuracy, etc...) par batch et par GPU, puis de les pondérer et de les moyenner sur l'ensemble des données de validation.
Pour cela, il faut:
- Charger les données de validation de la même manière que les données d'apprentissage, mais sans les transformations aléatoires
comme la data augmentation ou le shuffling (voir la documentation sur le chargement de données pour l'apprentissage distribué en TensorFlow) :
# validation dataset loading (imagenet for example)val_dataset = tf.data.TFRecordDataset(glob.glob(os.environ['DSDIR']+'/imagenet/validation*'))# define distributed sharding for validationval_dataset = val_dataset.shard(hvd.size(), hvd.rank())# define dataloader for validationval_dataset = val_dataset.map(tf_parse_eval,num_parallel_calls=idr_tf.cpus_per_task).batch(batch_size).prefetch(buffer_size=5)
- Basculer du mode "apprentissage" au mode "validation" pour désactiver certaines fonctionnalités propres à l'entraînement qui
sont coûteuses et inutiles ici :
model(images, training=False)lors de l'évaluation du modèle, pour basculer le modèle en mode "validation" et désactiver la gestion des dropout, des batchnorm, etc.- Enlever
tf.GradientTape()pour ignorer le calcul du gradient.
- Évaluer le modèle et calculer la métrique par batch de la manière habituelle (ici, nous prenons l'exemple du calcul de la loss, cela sera la même chose pour d'autres métriques) :
logits_ = model(images, training=False)suivi deloss_value = loss(labels, logits_).
- Pondérer et cumuler la métrique par GPU :
val_loss.update_state(loss_value, sample_weight=images.shape[0])oùimages.shape[0])est la taille du batch. Sachant que les batches n'ont pas nécessairement la même taille (dernier batch parfois plus petit), il est préférable d'utiliser ici la valeurimages.shape[0].
- Calculer les moyennes pondérées de la métrique sur l'ensemble des GPU :
hvd.allreduce(val_loss.result())pour moyenner les valeurs de la métrique calculées par GPU et communiquer le résultat à l'ensemble des GPU. Cette opération entraîne des communications inter-GPU. Horovod utilise la moyenne comme opération duallReducepar défaut.
Exemple après chargement des données de validation :
loss = tf.losses.SparseCategoricalCrossentropy() # define loss function
val_loss = tf.keras.metrics.Mean() # define Keras metrics
@tf.function # define validation step function
def eval_step(images, labels):
logits_ = model(images, training=False) # evaluate model - switch into validation mode
loss_value = loss(labels, logits_) # compute loss
val_loss.update_state(loss_value, sample_weight=images.shape[0]) # cumulate weighted mean per GP
val_loss.reset_states() # initialize val_loss value
for val_images, val_labels in val_loader:
eval_step(val_images, val_labels) # call eval_step function
val_loss_dist = hvd.allreduce(val_loss.result()) # average weighted means and broadcast value to each GPU
Exemple d'application
Un exemple se trouve dans $DSDIR/examples_IA/Horovod_parallel/Example_DataParallelism_Horovod_Tensorflow.ipynb sur Jean Zay, il utilise la base de données MNIST et un réseau dense simple. L'exemple est un Notebook qui permet de créer un script d'exécution. Il est à copier sur votre espace personnel (idéalement sur votre $WORK).
cp $DSDIR/examples_IA/Horovod_parallel/Example_DataParallelism_Horovod_Tensorflow.ipynb $WORK
Vous pouvez ensuite exécuter le Notebook à partir d'une machine frontale de Jean Zay en sélectionnant un noyau PyTorch ou TensorFlow (voir notre documentation sur l'accès à JupyterHub pour en savoir plus sur l'usage des Notebooks sur Jean Zay).