Horovod: Multi-GPU and multi-node data parallelism

Horovod is a software unit which permits data parallelism for TensorFlow, Keras, PyTorch, and Apache MXNet. The objective of Horovod is to make the code efficient and easy to implement.

In the examples from the AI community, Horovod is often used with Tensorflow to facilitate the implementation of data parallelism.

Horovod draws on the MPI and NCCL communication libraries, as well as the Intel Omni-Path (OPA) interconnection network, to exchange data between the devices (nodes, GPUs, CPUs). Stack Horovod

Description of the Horovod stack

We will document here the Horovod solutions on Jean Zay in TensorFlow or in Pytorch on Jean Zay. This is a multi-process parallelism which functions equally well in mono-node as in multi-node.

Multi-process configuration with SLURM

One of the advantages of Horovod is that no manual configuration is necessary in the description of the distributed environment. Horovod directly recuperates from the machine environment the information relative to the GPUs, to the available machines and the communication protocols. Of particular note, the Horovod solution assures the portability of your code.

In SLURM, When we launch a script with the srun command, the script is automatically distributed on all the predefined tasks. For example, if we reserve 4 eight-GPU nodes and request 3 GPUs per node, we obtain:

  • 4 nodes, indexed from 0 to 3.
  • 3 GPUs/node indexed from 0 to 2 on each node.
  • 4 x 3 = 12 processes in total, allowing the execution of 12 tasks with the ranks from 0 to 11.

Mutlti-process en SLURM

Illustration of a SLURM reservation of 4 nodes and 3 GPUs per node, equalling 12 processes.
The collective inter-node communications are managed by the NCCL library.

To execute a code distributed with Horovod in the SLURM environment, it is necessary to reserve one task per GPU implicated in the data parallelism.

Here are examples of SLURM script headings for effectuating:

  • A reservation of N four-GPU nodes via the GPU partition by default:
    #SBATCH --nodes=N            # total number of nodes (N to be defined)
    #SBATCH --ntasks-per-node=4  # number of tasks per node (here 4, or 1 task per reserved GPU)
    #SBATCH --gres=gpu:4         # number of GPUs reserved per node (here 4, or all the GPUs of a node)
    #SBATCH --cpus-per-task=10   # number of cores per task (here, we reserve 4x10 = 40 cores per node)
    #SBATCH --hint=nomultithread
     
    # execute script
    srun python script.py


    Comment : Here, the nodes are reserved exclusively because we are reserving the totality of the cores and GPUs. Notably, this gives access to the entire memory of each node.

  • A reservation of N eight-GPU nodes via the gpu_p2 partition:
    #SBATCH --partition=gpu_p2   # must be specified to be used
    #SBATCH --nodes=N            # total number of nodes (N to be defined)
    #SBATCH --ntasks-per-node=8  # number of tasks per node (here, 8 or 1 task per reserved GPU)
    #SBATCH --gres=gpu:8         # number of GPUs per node (here, 8 or all the GPUs of a node)
    #SBATCH --cpus-per-task=3    # number of cores per task (here, we reserve 8x3 = 24 cores per node)
    #SBATCH --hint=nomultithread
     
    # execute script
    srun python script.py


    Comment : Here, the nodes are reserved exclusively because we are reserving the totality of the cores and GPUs. Notably, this gives access to the entire memory of each node.

Implementation of the Horovod solution

It is possible to use Horovod in the following software configurations:

We refer you to the Horovod documentation for implementation details relative to each software configuration.

In each case, the code differs slightly but the development steps remain identical. It is necessary to:

  1. Import and initialize Horovod.
  2. Pin each GPU to a distinct process.
  3. Increase the learning rate proportionally to the number of GPUs to compensate for the increase in batch size.
  4. Distribute the optimizer.
  5. Ensure that the variables are correctly replicated on all the GPUs at the beginning of the training (and after initialization of the model).
  6. Save the checkpoints on the process of rank 0 only.

For example, the Horovod solution implemented in a TensorFlow 2 + Keras code is shown in the following script:

import tensorflow as tf
import horovod.tensorflow.keras as hvd
 
# Initialize Horovod
hvd.init()
 
# Pin GPU to be used to process local rank (one GPU per process)
gpus = tf.config.experimental.list_physical_devices('GPU')
for gpu in gpus:
    tf.config.experimental.set_memory_growth(gpu, True)
if gpus:
    tf.config.experimental.set_visible_devices(gpus[hvd.local_rank()], 'GPU')
 
# Build model and dataset
dataset = ...
model = ...
opt = tf.optimizers.Adam(0.001 * hvd.size())
 
# Horovod: add Horovod DistributedOptimizer.
opt = hvd.DistributedOptimizer(opt)
 
# Horovod: Specify `experimental_run_tf_function=False` to ensure TensorFlow
# uses hvd.DistributedOptimizer() to compute gradients.
mnist_model.compile(loss=tf.losses.SparseCategoricalCrossentropy(),
                    optimizer=opt,
                    metrics=['accuracy'],
                    experimental_run_tf_function=False)
callbacks = [
    # Horovod: broadcast initial variable states from rank 0 to all other processes.
    # This is necessary to ensure consistent initialization of all workers when
    # training is started with random weights or restored from a checkpoint.
    hvd.callbacks.BroadcastGlobalVariablesCallback(0),
]
 
# Horovod: save checkpoints only on worker 0 to prevent other workers from corrupting them.
if hvd.rank() == 0:
    callbacks.append(keras.callbacks.ModelCheckpoint('./checkpoint-{epoch}.h5'))
 
model.fit(dataset,
          steps_per_epoch=500 // hvd.size(),
          callbacks=callbacks,
          epochs=24,
          verbose=1 if hvd.rank() == 0 else 0)

Distributed validation

The validation step executed after each epoch, or after a set number of learning iterations, can be distributed on all the GPUs involved in the model learning. When data parallelism is used and the validation dataset is large, distributed validation on the GPUs seems to be the most efficient and the fastest solution.

Here, the goal of distributed validation is to calculate the metrics (loss, accuracy, etc) per batch and per GPU, and then to weight and average them out for the entire validation dataset.

For this, it is necessary to:

  1. Load the validation dataset in the same way as for the training datasets but without the randomized transformations such as data augmentation or shuffling (see the documentation on database loading in TensorFlow):
    # validation dataset loading (imagenet for example)                  
    val_dataset = tf.data.TFRecordDataset(glob.glob(os.environ['DSDIR']+'/imagenet/validation*'))
    # define distributed sharding for validation
    val_dataset = val_dataset.shard(hvd.size(), hvd.rank())
    # define dataloader for validation
    val_dataset = val_dataset.map(tf_parse_eval,
          num_parallel_calls=idr_tf.cpus_per_task).batch(batch_size).prefetch(buffer_size=5)
  2. Switch from “learning” mode to “validation” mode to deactivate certain functionalities specific to the training which are costly and not needed here:
    • model(images, training=False) during model evaluation to switch the model into “validation” mode and deactivate the management of dropouts, batchnorms, etc.
    • Without tf.GradientTape() to ignore the gradient calculation
  3. Evaluate the model and calculate the metric per batch in the usual way. (Here, we are taking the example of loss calculation. It will be the same for the other metrics.):
    • logits_ = model(images, training=False) followed by loss_value = loss(labels, logits_)
  4. Weight and average the metric(s) per GPU :
    • val_loss.update_state(loss_value, sample_weight=images.shape[0]) with images.shape[0]) as the batch size. Knowing that the batches are not necessarily the same size (the last batch is sometimes smaller), it is preferable to use the images.shape[0] value here.
  5. Find the mean of the weighted metric averages of all the GPUs :
    • hvd.allreduce(val_loss.result()) to average the metric values calculated per GPU and communicate the result to all the GPUs. This operation uses inter-GPU communications. Horovod uses the mean as allReduce operation by default.

Example after loading validation data :

loss = tf.losses.SparseCategoricalCrossentropy() # Define loss function
 
val_loss = tf.keras.metrics.Mean()               # Define Keras metrics
 
@tf.function                                     # Define validation step function
def eval_step(images, labels):
    logits_ = model(images, training=False)      # evaluate model - switch into validation mode     
    loss_value = loss(labels, logits_)           # compute loss
    val_loss.update_state(loss_value, sample_weight=images.shape[0]) # cumulate weighted mean per GP        
 
val_loss.reset_states()                          # initialize val_loss value
 
for val_images, val_labels in val_loader:
    eval_step(val_images, val_labels)            # Call eval_step function
 
val_loss_dist = hvd.allreduce(val_loss.result()) # Average weighted means and broadcast value to each GPU

Application example

An example is found in the $DSDIR/examples_IA/Horovod_parallel/Example_DataParallelism_Horovod_Tensorflow-eng.ipynb on Jean Zay using the MNIST database and a simple dense network. The example is a Notebook which you can use to create an execution script.

You can also download the notebook by clicking on this link.

This should be copied into your personal space (ideally in your $WORK).

$ cp $DSDIR/examples_IA/Horovod_parallel/Example_DataParallelism_Horovod_Tensorflow-eng.ipynb $WORK

You should then execute the Notebook from a Jean Zay front end after first loading a TensorFlow module (see our documentation on Access to JupyterHub for more information on using the Notebooks on Jean Zay).

Documentation and sources