Table des matières
Horovod: Multi-GPU and multi-node data parallelism
Horovod is a software unit which permits data parallelism for TensorFlow, Keras, PyTorch, and Apache MXNet. The objective of Horovod is to make the code efficient and easy to implement.
In the examples from the AI community, Horovod is often used with Tensorflow to facilitate the implementation of data parallelism.
Horovod draws on the MPI and NCCL communication libraries, as well as the Intel Omni-Path (OPA) interconnection network, to exchange data between the devices (nodes, GPUs, CPUs).
We will document here the Horovod solutions on Jean Zay in TensorFlow or in Pytorch on Jean Zay. This is a multi-process parallelism which functions equally well in mono-node as in multi-node.
Multi-process configuration with SLURM
One of the advantages of Horovod is that no manual configuration is necessary in the description of the distributed environment. Horovod directly recuperates from the machine environment the information relative to the GPUs, to the available machines and the communication protocols. Of particular note, the Horovod solution assures the portability of your code.
In SLURM, When we launch a script with the srun
command, the script is automatically distributed on all the predefined tasks. For example, if we reserve 4 eight-GPU nodes and request 3 GPUs per node, we obtain:
- 4 nodes, indexed from 0 to 3.
- 3 GPUs/node indexed from 0 to 2 on each node.
- 4 x 3 = 12 processes in total, allowing the execution of 12 tasks with the ranks from 0 to 11.
The collective inter-node communications are managed by the NCCL library.
To execute a code distributed with Horovod in the SLURM environment, it is necessary to reserve one task per GPU implicated in the data parallelism.
Here are examples of SLURM script headings for effectuating:
- A reservation of N four-GPU nodes via the GPU partition by default:
#SBATCH --nodes=N # total number of nodes (N to be defined) #SBATCH --ntasks-per-node=4 # number of tasks per node (here 4, or 1 task per reserved GPU) #SBATCH --gres=gpu:4 # number of GPUs reserved per node (here 4, or all the GPUs of a node) #SBATCH --cpus-per-task=10 # number of cores per task (here, we reserve 4x10 = 40 cores per node) #SBATCH --hint=nomultithread # execute script srun python script.py
Comment : Here, the nodes are reserved exclusively because we are reserving the totality of the cores and GPUs. Notably, this gives access to the entire memory of each node. - A reservation of N eight-GPU nodes via the
gpu_p2
partition:#SBATCH --partition=gpu_p2 # must be specified to be used #SBATCH --nodes=N # total number of nodes (N to be defined) #SBATCH --ntasks-per-node=8 # number of tasks per node (here, 8 or 1 task per reserved GPU) #SBATCH --gres=gpu:8 # number of GPUs per node (here, 8 or all the GPUs of a node) #SBATCH --cpus-per-task=3 # number of cores per task (here, we reserve 8x3 = 24 cores per node) #SBATCH --hint=nomultithread # execute script srun python script.py
Comment : Here, the nodes are reserved exclusively because we are reserving the totality of the cores and GPUs. Notably, this gives access to the entire memory of each node.
Implementation of the Horovod solution
It is possible to use Horovod in the following software configurations:
- TensorFlow 2 and Keras - See the Horovod Keras documentation
- TensorFlow 2 alone - See the Horovod TensorFlow documentation
- Pytorch - See the Horovod Pytorch documentation
- TensorFlow 1 and Keras - See the Horovod Keras documentation
- TensorFlow 1 alone - See the Horovod TensorFlow documentation
We refer you to the Horovod documentation for implementation details relative to each software configuration.
In each case, the code differs slightly but the development steps remain identical. It is necessary to:
- Import and initialize Horovod.
- Pin each GPU to a distinct process.
- Increase the learning rate proportionally to the number of GPUs to compensate for the increase in batch size.
- Distribute the optimizer.
- Ensure that the variables are correctly replicated on all the GPUs at the beginning of the training (and after initialization of the model).
- Save the checkpoints on the process of rank 0 only.
For example, the Horovod solution implemented in a TensorFlow 2 + Keras code is shown in the following script:
import tensorflow as tf import horovod.tensorflow.keras as hvd # Initialize Horovod hvd.init() # Pin GPU to be used to process local rank (one GPU per process) gpus = tf.config.experimental.list_physical_devices('GPU') for gpu in gpus: tf.config.experimental.set_memory_growth(gpu, True) if gpus: tf.config.experimental.set_visible_devices(gpus[hvd.local_rank()], 'GPU') # Build model and dataset dataset = ... model = ... opt = tf.optimizers.Adam(0.001 * hvd.size()) # Horovod: add Horovod DistributedOptimizer. opt = hvd.DistributedOptimizer(opt) # Horovod: Specify `experimental_run_tf_function=False` to ensure TensorFlow # uses hvd.DistributedOptimizer() to compute gradients. mnist_model.compile(loss=tf.losses.SparseCategoricalCrossentropy(), optimizer=opt, metrics=['accuracy'], experimental_run_tf_function=False) callbacks = [ # Horovod: broadcast initial variable states from rank 0 to all other processes. # This is necessary to ensure consistent initialization of all workers when # training is started with random weights or restored from a checkpoint. hvd.callbacks.BroadcastGlobalVariablesCallback(0), ] # Horovod: save checkpoints only on worker 0 to prevent other workers from corrupting them. if hvd.rank() == 0: callbacks.append(keras.callbacks.ModelCheckpoint('./checkpoint-{epoch}.h5')) model.fit(dataset, steps_per_epoch=500 // hvd.size(), callbacks=callbacks, epochs=24, verbose=1 if hvd.rank() == 0 else 0)
Distributed validation
The validation step executed after each epoch, or after a set number of learning iterations, can be distributed on all the GPUs involved in the model learning. When data parallelism is used and the validation dataset is large, distributed validation on the GPUs seems to be the most efficient and the fastest solution.
Here, the goal of distributed validation is to calculate the metrics (loss, accuracy, etc) per batch and per GPU, and then to weight and average them out for the entire validation dataset.
For this, it is necessary to:
- Load the validation dataset in the same way as for the training datasets but without the randomized transformations such as data augmentation or shuffling (see the documentation on database loading in TensorFlow):
# validation dataset loading (imagenet for example) val_dataset = tf.data.TFRecordDataset(glob.glob(os.environ['DSDIR']+'/imagenet/validation*')) # define distributed sharding for validation val_dataset = val_dataset.shard(hvd.size(), hvd.rank()) # define dataloader for validation val_dataset = val_dataset.map(tf_parse_eval, num_parallel_calls=idr_tf.cpus_per_task).batch(batch_size).prefetch(buffer_size=5)
- Switch from “learning” mode to “validation” mode to deactivate certain functionalities specific to the training which are costly and not needed here:
model(images, training=False)
during model evaluation to switch the model into “validation” mode and deactivate the management of dropouts, batchnorms, etc.- Without
tf.GradientTape()
to ignore the gradient calculation
- Evaluate the model and calculate the metric per batch in the usual way. (Here, we are taking the example of loss calculation. It will be the same for the other metrics.):
logits_ = model(images, training=False)
followed byloss_value = loss(labels, logits_)
- Weight and average the metric(s) per GPU :
val_loss.update_state(loss_value, sample_weight=images.shape[0])
withimages.shape[0])
as the batch size. Knowing that the batches are not necessarily the same size (the last batch is sometimes smaller), it is preferable to use theimages.shape[0]
value here.
- Find the mean of the weighted metric averages of all the GPUs :
hvd.allreduce(val_loss.result())
to average the metric values calculated per GPU and communicate the result to all the GPUs. This operation uses inter-GPU communications. Horovod uses the mean as allReduce operation by default.
Example after loading validation data :
loss = tf.losses.SparseCategoricalCrossentropy() # Define loss function val_loss = tf.keras.metrics.Mean() # Define Keras metrics @tf.function # Define validation step function def eval_step(images, labels): logits_ = model(images, training=False) # evaluate model - switch into validation mode loss_value = loss(labels, logits_) # compute loss val_loss.update_state(loss_value, sample_weight=images.shape[0]) # cumulate weighted mean per GP val_loss.reset_states() # initialize val_loss value for val_images, val_labels in val_loader: eval_step(val_images, val_labels) # Call eval_step function val_loss_dist = hvd.allreduce(val_loss.result()) # Average weighted means and broadcast value to each GPU
Application example
An example is found in the $DSDIR/examples_IA/Horovod_parallel/Example_DataParallelism_Horovod_Tensorflow-eng.ipynb
on Jean Zay using the MNIST database and a simple dense network. The example is a Notebook which you can use to create an execution script.
You can also download the notebook by clicking on this link.
This should be copied into your personal space (ideally in your $WORK
).
$ cp $DSDIR/examples_IA/Horovod_parallel/Example_DataParallelism_Horovod_Tensorflow-eng.ipynb $WORK
You should then execute the Notebook from a Jean Zay front end after first loading a TensorFlow module (see our documentation on Access to JupyterHub for more information on using the Notebooks on Jean Zay).