TensorFlow : Parallélisme de données multi-GPU et multi-nœuds

Cette page explique comment distribuer sur plusieurs GPU un modèle de réseau de neurones implémenté dans un code TensorFlow selon la méthode de parallélisme de données.

Un exemple applicatif est proposé sous forme d'un Notebook en bas de page pour vous permettre d'accéder à une implémentation fonctionnelle des explications ci-dessous.

Implémentation d'une stratégie de distribution

Pour distribuer un modèle en TensorFlow, on définit une stratégie de distribution en créant une instance de la classe tf.distribute.Strategy. Cette stratégie permet de contrôler la manière dont sont répartis les données et les calculs sur les GPU.

Choix de la stratégie

MultiWorkerMirroredStrategy

TensorFlow fournit plusieurs stratégies pré-implémentées. Dans cette documentation, nous présentons seulement la stratégie tf.distribute.MultiWorkerMirroredStrategy. Celle-ci a l'avantage d'être générique dans le sens où elle permet le passage aussi bien en multi-GPU qu'en multi-nœuds, sans perte de performance par rapport aux autres stratégies testées.

En utilisant la stratégie MultiWorkerMirroredStrategy, les variables du modèle sont répliquées sur l'ensemble des GPU détectés. Chaque GPU traite une partie des données (mini-batch) et des opérations de réduction collectives sont effectuées pour agréger les tenseurs et mettre à jour les variables sur chaque GPU à chaque étape.

Environnement multi-workers

La MultiWorkerMirroredStrategy est une version multi-workers de la stratégie tf.distribute.MirroredStrategy. Elle s'exécute sur plusieurs tâches (ou workers), chacune d'entre elles étant assimilée à un nom d'hôte et un numéro de port. L'ensemble des tâches constitue un cluster sur lequel se base la stratégie de distribution pour synchroniser les GPU.

La notion de worker (et de cluster) permet notamment une exécution sur plusieurs nœuds de calcul. Chaque worker peut être associé à un ou plusieurs GPU. Sur Jean Zay, nous conseillons de définir un worker par GPU.

L'environnement multi-workers d'une exécution est automatiquement détectée à partir des variables Slurm définies dans votre script de soumission grâce à la classe tf.distribute.cluster_resolver.SlurmClusterResolver.

La stratégie MultiWorkerMirroredStrategy peut se baser sur deux types de protocoles de communications inter-GPU : gRPC ou NCCL. Sur Jean Zay, il est conseillé de demander l'utilisation du protocole de communication NCCL pour obtenir les meilleures performances.

Déclaration

La déclaration de la stratégie MultiWorkerMirroredStrategy se fait finalement en quelques lignes :

# build multi-worker environment from Slurm variables
cluster_resolver = tf.distribute.cluster_resolver.SlurmClusterResolver(port_base=12345)           
 
# use NCCL communication protocol
implementation = tf.distribute.experimental.CommunicationImplementation.NCCL
communication_options = tf.distribute.experimental.CommunicationOptions(implementation=implementation) 
 
# declare distribution strategy
strategy = tf.distribute.MultiWorkerMirroredStrategy(cluster_resolver=cluster_resolver,
                                                     communication_options=communication_options) 

Remarque : sur Jean Zay, vous pouvez exploiter les numéros de port compris entre 10000 et 20000.

Attention : il y a actuellement une limitation TensorFlow sur la déclaration de la stratégie, celle-ci doit être faite avant tout autre appel à une opération TensorFlow.

Intégration au modèle d'apprentissage

Pour répliquer un modèle sur plusieurs GPU, celui-ci doit être créé dans le contexte strategy.scope().

La méthode .scope() fournit un gestionnaire de contexte qui capture les variables TensorFlow et les communique à chaque GPU en fonction de la stratégie choisie. On y déclare les éléments qui créent les variables relatives au modèle : le chargement d'un modèle enregistré, la déclaration d'un modèle, la fonction model.compile(), l'optimiseur,…

Voici un exemple de déclaration d'un modèle à répliquer sur l'ensemble des GPU :

# get total number of workers
n_workers = int(os.environ['SLURM_NTASKS'])
 
# define batch size
batch_size_per_gpu = 64
global_batch_size = batch_size_per_gpu * n_workers
 
# load dataset
dataset = tf.data.Dataset.from_tensor_slices(...)
# [...]
dataset = dataset.batch(global_batch_size)
 
# model building/compiling need to be within `strategy.scope()`
with strategy.scope():
 
  multi_worker_model = tf.keras.Sequential([
    tf.keras.Input(shape=(28, 28)),
    tf.keras.layers.Reshape(target_shape=(28, 28, 1)),
    #...
    ])
 
  multi_worker_model.compile(
    loss=tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True),
    optimizer=tf.keras.optimizers.SGD(learning_rate=0.001*n_workers),      
    metrics=['accuracy'])

Remarques :

  1. Lors de la création de l'objet dataset, la taille de batch à spécifier est la taille de batch globale et non la taille de batch par GPU.
  2. L'utilisation de certains optimiseurs comme SGD nécessite un ajustement du learning rate proportionnel à la taille de batch globale, et donc au nombre de GPU.

Entraînement distribué d'un modèle de type tf.keras.Model

La fonction model.fit() de la librairie Keras prend en charge automatiquement la distribution de l'entraînement selon la stratégie choisie. L'entraînement est donc lancé de manière habituelle. Par exemple :

multi_worker_model.fit(train_dataset, epochs=10, steps_per_epoch=100)

Remarques :

  • La distribution des données d'entrées sur les différents processus (data sharding) est gérée automatiquement au sein de la fonction model.fit().
  • L'étape d'évaluation se fait automatiquement en mode distribué en passant le dataset d'évaluation à la fonction model.fit() :
    multi_worker_model.fit(train_dataset, epochs=3, steps_per_epoch=100, validation_data=valid_dataset)

Entraînement distribué avec une boucle personnalisée (Custom Loop)

On a vu précédemment comment définir un contexte de distribution suivant la stratégie MultiWorkerMirroredStrategy et notamment l'utilisation de la méthode .scope() pour synchroniser les valeurs des variables qui ont été crées par les workers et faire des copies de ces variables pour chaque GPU d'un worker : 1 worker = 1 exécution du code python, 1 worker peut être amené à travailler avec plusieurs GPUs et dans ce cas, il faut faire des copies des variables sur chaque GPU que le worker manipule, dans le cas de 1 GPU par worker, on exécute autant de fois le code python qu'il y a de GPUs et on ne se soucie pas de la manipulation de plusieurs GPUs avec le même script. Une fois le modèle créé à partir de l'API de Keras dans le .scope(), la méthode .fit() permet facilement de lancer un entraînement distribué.

Pour avoir un plus grand contrôle sur l'entraînement du modèle et son évaluation, il est possible de passer outre l'appel à la méthode .fit() et de définir soi-même sa boucle d'entraînement distribué. Pour cela, il faut faire attention à trois choses :

  1. la synchronisation et la distribution du dataset à travers les workers et les GPUs des workers ;
  2. de bien entraîner son modèle sur un mini-batch pour chaque GPU et d'agglomérer les gradients locaux pour former le gradient global ;
  3. de bien distribuer le calcul des métriques sur chaque GPU et d'agglomérer les métriques locales pour former les métriques globales.

1) Pour distribuer manuellement le dataset, on peut utiliser la méthode .experimental_distribute_dataset(dataset) qui prend en argument un tf.data.Dataset et s'occupe de découper équitablement le dataset pour chaque worker. Par conséquent, on commencera par définir le tf.data.Dataset.

Deux opérations importantes sur un tel dataset sont le mélange du dataset et son découpage en batch.

  1. Le découpage en batch est obligatoire, car on a besoin dans un premier temps de découper le dataset en batch dit global batch et ensuite lors de la distribution du dataset, ces global batches seront découpé en autant de mini-batches qu'il y a de workers.
  2. Le mélange est fortement recommandé pour l'entraînement d'un réseau de neurones, mais il faut faire très attention car 2 workers peuvent se retrouver avec deux mélanges différents du dataset. C'est pourquoi il est impératif de fixer une seed globale pour s'assurer que les même mélanges soit effectué sur les workers.
  3. Un dataset, contenant 12 objets, chacun décrit par 5 valeurs, aura comme élément de base un tenseur de shape (5,). Si on réalise un découpage en batch de 3, l'élément de base de ce nouveau dataset sera un tenseur de shape (3,5) et ce dataset contiendra au total 4 de ces tenseurs.
  4. Par conséquent, .shuffle().batch() et .batch().shuffle() ne produisent pas le même effet et l'opération recommandée est d'appliquer .shuffle().batch(). La deuxième opération mélange l'ordre des batches, mais le contenu à l'intérieur des batches reste identique d'un epoch à l'autre.
  5. Pour faciliter le calcul exact des gradients et des métriques, il est aussi recommandé de supprimer le dernier global batch si celui-ci n'est pas de taille global_batch_size : pour ce faire, utiliser l'argument optionnel drop_remainder de batch().
# Create a tf.data.Dataset
train_dataset = ...
# Set a global seed
tf.random.set_seed(1234)
# Shuffle tf.data.Dataset
train_dataset = train_dataset.shuffle(...)
# Batching using `global_batch_size`
train_dataset = train_dataset.batch(global_batch_size, drop_remainder=True)

Si toutefois, le batching est réalisé sans drop_remainder, vous pouvez tout de même réaliser la suppression du dernier batch manuellement.

# Remove the last global batch if it is smaller than the others
# We suppose that a global batch is a tuple of (inputs,labels)
train_dataset = train_dataset.filter(lambda x, y : len(x) == globa_batch_size)

On peut finalement obtenir notre tf.distribute.DistributedDataset. Lors de l'itération de ce dataset, un worker récupérera un tf.distribute.DistributedValues contenant autant de mini-batches qu'il a de GPUs, exemple :

# Suppose, we have 2 workers with 3 GPUs each
# Each global batch are separated into 6 mini-batches
train_dist_dataset = strategy.experimental_distribute_dataset(train_dataset)
# on one worker, distributed_batch contains 3 mini-batches from the same global batch
# on the other worker, the distributed_batch contains the 3 remaining mini-batches from the same global batch
distributed_batch = next(iter(train_dist_dataset))

next dans ce contexte précis est une opération synchrone, si 1 worker ne fait pas cette appel, les autres workers seront bloqués.

2) Passons maintenant à la boucle d'entraînement. On va considérer le cas particulier et recommandé sur Jean-Zay d'un GPU par worker. On va définir la fonction train_epoch() qui va lancer un entraînement sur un epoch :

@tf.function
def train_epoch(): # ------------------------------------------------------------------- train 1 epoch
 
  def step_fn(mini_batch): # ------------------------------------------------------------ train 1 step
    x, y = mini_batch
    with tf.GradientTape() as tape:
      # compute predictions
      predictions = model(x, training=True)
      # compute the loss of each input in the mini-batch without reduction
      reduction = tf.keras.losses.Reduction.NONE
      losses_mini_batch_per_gpu = tf.keras.losses.SparseCategoricalCrossentropy(reduction=reduction)(
                                                                                        y, predictions)
      # sum all the individual losses and divide by `global_batch_size`
      loss_mini_batch_per_gpu = tf.nn.compute_average_loss(losses_mini_batch_per_gpu,
                                                                   global_batch_size=global_batch_size)
      # compute the gradient
      grads = tape.gradient(loss_mini_batch_per_gpu, model.trainable_variables)
      # inside MultiWorkerMirroredStrategy, `grads` will be summed across all GPUs first
      # before updating the parameters
      optimizer.apply_gradients(zip(grads, model.trainable_variables))
 
      return loss_mini_batch_per_gpu
 
  # loss computed on the whole dataset
  total_loss_gpu = 0.0
  train_n_batches = 0
  for distributed_batch in train_dist_dataset:
    train_n_batches += 1
    total_loss_gpu += strategy.run(step_fn, args=(distributed_batch,))
  # we get the global loss across the train dataset for 1 GPU
  total_loss_gpu /= float(train_n_batches)
 
  # `strategy.reduce()` will sum the given value across GPUs and return result on current device
  return strategy.reduce(tf.distribute.ReduceOp.SUM, total_loss_gpu, axis=None)

Quelques observations à faire :

  1. le décorateur @tf.function est une optimisation recommandée lorsqu'on fait des appels à strategy.run()
  2. dans step_fn(), on utilise 1 GPU pour calculer la loss de chaque échantillon d'1 mini batch. Il est important ensuite de sommer ces losses et de diviser la somme par la taille du batch global et non la taille du mini-batch, puisque automatiquement, tf va calculer le gradient global en faisant une somme des gradients locaux
  3. dans le cas d'au moins 2 GPU par worker, le distributed_batch contient plusieurs mini-batches et strategy.run() permet de répartir les mini-batches sur les GPUs
  4. total_loss_gpu ne contient que la loss calculée sur la portion du dataset attribuée au GPU, le .reduce() permet ensuite d'obtenir la loss sur 1 epoch

3) En ce qui concerne la partie de calcul des métriques lors de l'évaluation du modèle. Elle ressemble fortement à l'entraînement en enlevant la partie sur le calcul du gradient et la mise à jour des poids.

Configuration de l'environnement de calcul

Modules Jean Zay

Pour obtenir des performances optimales en exécution distribuée sur Jean Zay, il faut charger l'un des modules suivants :

  • tensorflow-gpu/py3/2.4.0-noMKL
  • tensorflow-gpu/py3/2.5.0+nccl-2.8.3
  • n'importe quel module de version ≥ 2.6.0

Attention : dans les autres environnements Jean Zay, la librairie TensorFlow a été compilée avec certaines options qui peuvent entraîner des pertes de performance non négligeables.

Configuration de la réservation Slurm

Une réservation Slurm correcte contient autant de tâches Slurm que de workers car le script python doit être exécuté sur chaque worker. Nous associons ici un worker par GPU, donc autant de tâches Slurm que de GPU.

Attention : TensorFlow cherche par défaut à utiliser le protocole HTTP. Pour empêcher cela, il faut désactiver le proxy HTTP de Jean Zay en supprimant les variables d'environnement http_proxy, https_proxy, HTTP_PROXY et HTTPS_PROXY.

Voici un exemple de réservation sur 2 nœuds quadri-GPU :

#!/bin/bash
#SBATCH --job-name=tf_distributed     
#SBATCH --nodes=2 #------------------------- number of nodes 
#SBATCH --ntasks=8 #------------------------ number of tasks / workers
#SBATCH --gres=gpu:4 #---------------------- number of GPUs per node
#SBATCH --cpus-per-task=10           
#SBATCH --hint=nomultithread          
#SBATCH --time=01:00:00              
#SBATCH --output=tf_distributed%j.out 
#SBATCH --error=tf_distributed%j.err  
 
# deactivate the HTTP proxy
unset http_proxy https_proxy HTTP_PROXY HTTPS_PROXY
 
# activate the environment
module purge
module load tensorflow-gpu/py3/2.6.0
 
srun python3 -u script.py

Exemple d'application

Exécution multi-GPU et multi-nœuds avec la MultiWorkerMirroredStrategy

Un exemple sous forme de Notebook se trouve dans $DSDIR/examples_IA/Tensorflow_parallel/Example_DataParallelism_TensorFlow.ipynb sur Jean-Zay. Vous pouvez aussi le télécharger en cliquant sur ce lien.

L'exemple est un Notebook dans lequel les entraînements présentés ci-dessus sont implémentés et exécutés sur 1, 2, 4 et 8 GPU. Les exemples se basent sur le modèle ResNet50 et la base de données CIFAR-10.

Vous devez d'abord récupérer le Notebook dans votre espace personnel (par exemple sur votre $WORK) :

$ cp $DSDIR/examples_IA/Tensorflow_parallel/Example_DataParallelism_TensorFlow.ipynb $WORK

Vous pouvez ensuite exécuter le Notebook à partir d'une machine frontale de Jean Zay en chargeant préalablement un module TensorFlow (voir notre documentation sur l'accès à JupyterHub pour en savoir plus sur l'usage des Notebooks sur Jean Zay).

Sources et documentation