Parallélisme de données avec TensorFlow
Cette page explique comment distribuer sur plusieurs GPU un modèle de réseau de neurones implémenté dans un code TensorFlow selon la méthode de parallélisme de données.
Un exemple applicatif est proposé sous forme d'un Notebook en bas de page pour vous permettre d'accéder à une implémentation fonctionnelle des explications données ci-dessous.
Implémentation d'une stratégie de distribution
Pour distribuer un modèle en TensorFlow, on définit une stratégie de distribution en créant une instance de la classe tf.distribute.Strategy. Cette stratégie permet de contrôler la manière dont sont répartis les données et les calculs sur les GPU.
Choix de la stratégie
MultiWorkerMirroredStrategy
TensorFlow fournit plusieurs stratégies pré-implémentées. Dans cette documentation, nous présentons seulement la stratégie tf.distribute.MultiWorkerMirroredStrategy. Celle-ci a l'avantage d'être générique dans le sens où elle permet le passage aussi bien en multi-GPU qu'en multi-nœuds, sans perte de performance par rapport aux autres stratégies testées.
En utilisant la stratégie MultiWorkerMirroredStrategy, les variables du modèle sont répliquées sur l'ensemble des GPU détectés. Chaque GPU traite une partie des données (mini-batch) et des opérations de réduction collectives sont effectuées pour agréger les tenseurs et mettre à jour les variables sur chaque GPU à chaque étape.
Environnement multi-workers
La MultiWorkerMirroredStrategy est une version multi-workers de la stratégie tf.distribute.MirroredStrategy. Elle s'exécute sur plusieurs tâches (ou workers), chacune d'entre elles étant assimilée à un nom d'hôte et un numéro de port. L'ensemble des tâches constitue un cluster sur lequel se base la stratégie de distribution pour synchroniser les GPU.
La notion de worker (et de cluster) permet notamment une exécution sur plusieurs nœuds de calcul. Chaque worker peut être associé à un ou plusieurs GPU. Sur Jean Zay, nous conseillons de définir un worker par GPU.
L'environnement multi-workers d'une exécution est automatiquement détectée à partir des variables Slurm définies dans votre script de soumission grâce à la classe tf.distribute.cluster_resolver.SlurmClusterResolver.
La stratégie MultiWorkerMirroredStrategy peut se baser sur deux types de protocoles de communications inter-GPU : gRPC ou NCCL. Sur Jean Zay, il est conseillé de demander l'utilisation du protocole de communication NCCL pour obtenir les meilleures performances.
Déclaration
La déclaration de la stratégie MultiWorkerMirroredStrategy se fait en quelques lignes :
# build multi-worker environment from Slurm variables
cluster_resolver = tf.distribute.cluster_resolver.SlurmClusterResolver(port_base=12345)
# use NCCL communication protocol
implementation = tf.distribute.experimental.CommunicationImplementation.NCCL
communication_options = tf.distribute.experimental.CommunicationOptions(implementation=implementation)
# declare distribution strategy
strategy = tf.distribute.MultiWorkerMirroredStrategy(cluster_resolver=cluster_resolver,
communication_options=communication_options)
Sur Jean Zay, vous pouvez exploiter les numéros de port compris entre 10000 et 20000.
Il existe une limitation TensorFlow sur la déclaration de la stratégie, celle-ci doit être faite avant tout autre appel à une opération TensorFlow.
Intégration au modèle d'apprentissage
Pour répliquer un modèle sur plusieurs GPU, celui-ci doit être créé dans le contexte strategy.scope().
La méthode .scope() fournit un gestionnaire de contexte qui capture les variables TensorFlow et les communique à chaque GPU en fonction de la stratégie choisie. On y déclare les éléments qui créent les variables relatives au modèle : le chargement d'un modèle enregistré, la déclaration d'un modèle, la fonction model.compile(), l'optimiseur,...
Voici un exemple de déclaration d'un modèle à répliquer sur l'ensemble des GPU :
# get total number of workers
n_workers = int(os.environ['SLURM_NTASKS'])
# define batch size
batch_size_per_gpu = 64
global_batch_size = batch_size_per_gpu * n_workers
# load dataset
dataset = tf.data.Dataset.from_tensor_slices(...)
# [...]
dataset = dataset.batch(global_batch_size)
# model building/compiling need to be within `strategy.scope()`
with strategy.scope():
multi_worker_model = tf.keras.Sequential([
tf.keras.Input(shape=(28, 28)),
tf.keras.layers.Reshape(target_shape=(28, 28, 1)),
#...
])
multi_worker_model.compile(
loss=tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True),
optimizer=tf.keras.optimizers.SGD(learning_rate=0.001*n_workers),
metrics=['accuracy'])
- Lors de la création de l'objet
dataset, la taille de batch à spécifier est la taille de batch globale et non la taille de batch par GPU. - L'utilisation de certains optimiseurs comme SGD nécessite un ajustement du learning rate proportionnel à la taille de batch globale, et donc au nombre de GPU.
Entraînement distribué d'un modèle de type tf.keras.Model
La fonction model.fit() de la librairie Keras prend en charge automatiquement la distribution de l'entraînement selon la stratégie choisie. L'entraînement est donc lancé de manière habituelle. Par exemple :
multi_worker_model.fit(train_dataset, epochs=10, steps_per_epoch=100)
- La distribution des données d'entrées sur les différents processus (data sharding) est gérée automatiquement au sein de la fonction
model.fit(). - L'étape d'évaluation se fait automatiquement en mode distribué en passant le dataset d'évaluation à la fonction
model.fit():
multi_worker_model.fit(train_dataset, epochs=3, steps_per_epoch=100, validation_data=valid_dataset)
Entraînement distribué avec une boucle personnalisée (Custom Loop)
Pour avoir un plus grand contrôle sur l'entraînement du modèle et son évaluation, il est possible de passer outre l'appel à la méthode
.fit() et de définir soi-même sa boucle d'entraînement distribué. Pour cela, il faut faire attention à trois choses :
- distribuer correctement le dataset entre les workers et les GPU des workers ;
- entraîner son modèle seulement sur un mini-batch pour chaque GPU et agglomérer les gradients locaux pour former le gradient global ;
- distribuer correctement le calcul des métriques sur chaque GPU et agglomérer les métriques locales pour former les métriques globales.
1. Distribuer correctement le dataset entre les workers et les GPU des workers
Pour distribuer manuellement le dataset, on peut utiliser la méthode .experimental_distribute_dataset(dataset) qui prend en argument un tf.data.Dataset et s'occupe de découper équitablement le dataset pour chaque worker. Par conséquent, on commencera par définir le tf.data.Dataset.
Deux opérations importantes sur un tel dataset sont son découpage en batch et son mélange (shuffling).
- Le découpage en batch est obligatoire, car on a besoin, dans un premier temps, de découper le dataset en batch dit global batch. Ensuite lors de la distribution du dataset, ces global batches seront découpés en autant de mini-batches qu'il y a de workers.
- Le mélange est fortement recommandé pour l'entraînement d'un réseau de neurones, mais il faut faire très attention car 2 workers peuvent se retrouver avec deux mélanges différents du dataset. C'est pourquoi il est impératif de fixer une seed globale pour s'assurer que les même mélanges soit effectués sur les workers.
- L'ordre des opérations entre le découpage en batch et le mélange est important ! Les instructions
.shuffle().batch()et.batch().shuffle()ne produisent pas le même effet. Il est recommandé d'appliquer l'instruction.shuffle().batch(). La deuxième intruction mélange l'ordre des batches, mais le contenu à l'intérieur des batches reste identique d'une epoch à l'autre. - Pour faciliter le calcul exact des gradients et des métriques, il est recommandé de supprimer le dernier global batch si celui-ci n'est pas de taille
global_batch_size: pour ce faire, utiliser l'argument optionneldrop_remainderdebatch().
En résumé :
# Create a tf.data.Dataset
train_dataset = ...
# Set a global seed
tf.random.set_seed(1234)
# Shuffle tf.data.Dataset
train_dataset = train_dataset.shuffle(...)
# Batching using `global_batch_size`
train_dataset = train_dataset.batch(global_batch_size, drop_remainder=True)
Si le batching est réalisé sans drop_remainder, vous pouvez réaliser la suppression du dernier batch manuellement :
# Remove the last global batch if it is smaller than the others
# We suppose that a global batch is a tuple of (inputs,labels)
train_dataset = train_dataset.filter(lambda x, y : len(x) == globa_batch_size)
On peut finalement obtenir notre tf.distribute.DistributedDataset. Lors de l'itération de ce dataset, un worker récupérera un
tf.distribute.DistributedValues contenant autant de mini-batches qu'il a de GPU, par exemple :
# Suppose, we have 2 workers with 3 GPUs each
# Each global batch is separated into 6 mini-batches
train_dist_dataset = strategy.experimental_distribute_dataset(train_dataset)
# On one worker, distributed_batch contains 3 mini-batches from the same global batch
# On the other worker, the distributed_batch contains the 3 remaining mini-batches from the same global batch
distributed_batch = next(iter(train_dist_dataset))
Dans ce contexte précis, next est une opération synchrone, si 1 worker ne fait pas cette appel, les autres workers seront bloqués.
2. Entraîner son modèle seulement sur un mini-batch pour chaque GPU et agglomérer les gradients locaux pour former le gradient global
Passons maintenant à la boucle d'entraînement. On va considérer le cas particulier et recommandé sur Jean-Zay d'un GPU par worker. On va définir la fonction train_epoch() qui va lancer un entraînement sur une epoch :
@tf.function
def train_epoch(): # ------------------------------------------ train 1 epoch
def step_fn(mini_batch): # ---------------------------------- train 1 step
x, y = mini_batch
with tf.GradientTape() as tape:
# compute predictions
predictions = model(x, training=True)
# compute the loss of each input in the mini-batch without reduction
reduction = tf.keras.losses.Reduction.NONE
losses_mini_batch_per_gpu = tf.keras.losses.SparseCategoricalCrossentropy(reduction=reduction)(
y, predictions)
# sum all the individual losses and divide by `global_batch_size`
loss_mini_batch_per_gpu = tf.nn.compute_average_loss(losses_mini_batch_per_gpu,
global_batch_size=global_batch_size)
# compute the gradient
grads = tape.gradient(loss_mini_batch_per_gpu, model.trainable_variables)
# inside MultiWorkerMirroredStrategy, `grads` will be summed across all GPUs first
# before updating the parameters
optimizer.apply_gradients(zip(grads, model.trainable_variables))
return loss_mini_batch_per_gpu
# loss computed on the whole dataset
total_loss_gpu = 0.0
train_n_batches = 0
for distributed_batch in train_dist_dataset:
train_n_batches += 1
total_loss_gpu += strategy.run(step_fn, args=(distributed_batch,))
# we get the global loss across the train dataset for 1 GPU
total_loss_gpu /= float(train_n_batches)
# `strategy.reduce()` will sum the given value across GPUs and return result on current device
return strategy.reduce(tf.distribute.ReduceOp.SUM, total_loss_gpu, axis=None)
- Le décorateur
@tf.functionest une optimisation recommandée lorsqu'on fait des appels àstrategy.run(). - Dans
step_fn(), on utilise un GPU pour calculer la loss de chaque échantillon d'un mini batch. Il est important ensuite de sommer ces losses et de diviser la somme par la taille du batch global et non la taille du mini-batch, puisque automatiquement, TensorFlow va calculer le gradient global en faisant une somme des gradients locaux. - Dans le cas d'au moins 2 GPU par worker, le
distributed_batchcontient plusieurs mini-batches etstrategy.run()permet des répartir les mini-batches sur les GPU. total_loss_gpune contient que la loss calculée sur la portion du dataset attribuée au GPU, le.reduce()permet ensuite d'obtenir la loss sur une epoch
3. Distribuer correctement le calcul des métriques sur chaque GPU et agglomérer les métriques locales pour former les métriques globales
En ce qui concerne la partie de calcul des métriques lors de l'évaluation du modèle, elle ressemble fortement à l'entraînement en enlevant la partie sur le calcul du gradient et la mise à jour des poids.
Configuration de l'environnement de calcul
Modules Jean Zay
Pour obtenir des performances optimales en exécution distribuée sur Jean Zay, il faut charger l'un des modules suivants :
tensorflow-gpu/py3/2.4.0-noMKLtensorflow-gpu/py3/2.5.0+nccl-2.8.3- n'importe quel module de version ≥ 2.6.0
Dans les autres environnements Jean Zay, la librairie TensorFlow a été compilée avec certaines options qui peuvent entraîner des pertes de performance non négligeables.
Configuration de la réservation Slurm
Une réservation Slurm correcte contient autant de tâches Slurm que de workers car le script python doit être exécuté sur chaque worker. Nous associons ici un worker par GPU, donc autant de tâches Slurm que de GPU.
TensorFlow cherche par défaut à utiliser le protocole HTTP. Pour empêcher cela, il faut désactiver le proxy HTTP de Jean Zay en supprimant les variables d'environnement http_proxy, https_proxy, HTTP_PROXY et HTTPS_PROXY.
Voici un exemple de réservation sur 2 nœuds quadri-GPU :
#!/bin/bash
#SBATCH --job-name=tf_distributed
#SBATCH --nodes=2 #------------------------- number of nodes
#SBATCH --ntasks=8 #------------------------ number of tasks / workers
#SBATCH --gres=gpu:4 #---------------------- number of GPUs per node
#SBATCH --cpus-per-task=10
#SBATCH --hint=nomultithread
#SBATCH --time=01:00:00
#SBATCH --output=tf_distributed%j.out
#SBATCH --error=tf_distributed%j.err
# deactivate the HTTP proxy
unset http_proxy https_proxy HTTP_PROXY HTTPS_PROXY
# activate the environment
module purge
module load tensorflow-gpu/py3/2.6.0
srun python3 -u script.py
Exemple d'application
Exécution multi-GPU et multi-nœuds avec la MultiWorkerMirroredStrategy
Un exemple sous forme de Notebook se trouve dans $DSDIR/examples_IA/Tensorflow_parallel/Example_DataParallelism_TensorFlow.ipynb sur Jean-Zay.
L'exemple est un Notebook dans lequel les entraînements présentés ci-dessus sont implémentés et exécutés sur 1, 2, 4 et 8 GPU. Les exemples se basent sur le modèle ResNet50 et la base de données CIFAR-10.
Vous devez d'abord récupérer le Notebook dans votre espace personnel
(par exemple sur votre $WORK) :
cp $DSDIR/examples_IA/Tensorflow_parallel/Example_DataParallelism_TensorFlow.ipynb $WORK
Vous pouvez ensuite exécuter le Notebook à partir d'une machine frontale de Jean Zay en sélectionnant un noyau TensorFlow (voir notre documentation sur l'accès à JupyterHub pour en savoir plus sur l'usage des Notebooks sur Jean Zay).