Horovod: Multi-GPU and multi-node data parallelism

Horovod is a software unit which permits data parallelism for TensorFlow, Keras, PyTorch, and Apache MXNet. The objective of Horovod is to make the code efficient and easy to implement.

In examples from the AI community, Horovod is often used with Tensorflow to facilitate the implementation of data parallelism.

Horovod draws on the MPI and NCCL communication libraries, as well as the Intel Omni-Path (OPA) interconnection network, to exchange data between the devices (nodes, GPUs, CPUs). Stack Horovod

Description of the Horovod stack

We will document here the Horovod solutions inTensorFlow and in Pytorch on Jean Zay. This is a multi-process parallelism which functions equally well in mono-node and multi-node.

Multi-process configuration with SLURM

One of the advantages of Horovod is that no configuration manual is necessary in the description of the distributed environment. Horovod recovers the information relative to the GPUs, the available machines and the communication protocols, directly from the machine environment. Of particular note, the Horovod solution assures the portability of your code.

When you launch a script in SLURM with the srun command, the script is automatically distributed on all the predefined tasks. For example, if we reserve 4 eight-GPU nodes and request 3 GPUs per node, we obtain:

  • 4 nodes, indexed from 0 to 3.
  • 3 GPUs/node indexed from 0 to 2 on each node.
  • 4 x 3 = 12 processes in total, allowing the execution of 12 tasks with the ranks from 0 to 11.

Mutlti-process en SLURM

Illustration of a SLURM reservation of 4 nodes and 3 GPUs per node, equalling 12 processes.
The collective inter-node communications are managed by the NCCL library.

To execute a distributed code with Horovod in the SLURM environment, it is necessary to reserve one task per GPU implicated in the data parallelism.

Here are examples of SLURM script headings for effectuating:

  • A reservation of N four-GPU nodes via the GPU partition by default:
    #SBATCH --nodes=N            # total number of nodes (N to be defined)
    #SBATCH --ntasks-per-node=4  # number of tasks per node (here 4, or 1 task per reserved GPU)
    #SBATCH --gres=gpu:4         # number of GPUs reserved per node (here 4, or all the GPUs of a node)
    #SBATCH --cpus-per-task=10   # number of cores per task (here, we reserve 4x10 = 40 cores per node)
    #SBATCH --hint=nomultithread
     
    # execute script
    srun python script.py


    Comment : Here, the nodes are reserved exclusively because we are reserving the totality of the cores and GPUs. Of particular note, this gives access to the entire memory of each node.

  • A reservation of N eight-GPU nodes via the gpu_p2 partition:
    #SBATCH --partition=gpu_p2   # must be specified to be used
    #SBATCH --nodes=N            # total number of nodes (N to be defined)
    #SBATCH --ntasks-per-node=8  # number of tasks per node (here, 8 or 1 task per reserved GPU)
    #SBATCH --gres=gpu:8         # number of GPUs per node (here, 8 or all the GPUs of a node)
    #SBATCH --cpus-per-task=3    # number of cores per task (here, we reserve 8x3 = 24 cores per node)
    #SBATCH --hint=nomultithread
     
    # execute script
    srun python script.py


    Comment : Here, the nodes are reserved exclusively because we are reserving the totality of the cores and GPUs. Of particular note, this gives access to the entire memory of each node.

Implementation of the Horovod solution

It is possible to use Horovod in the following software configurations:

We refer you to the Horovod documentation for the implementation details relative to each software configuration.

In each case, the code differs slightly but the development steps remain identical. It is necessary to:

  1. Import and initialize Horovod.
  2. Pin each GPU to a distinct process.
  3. Increase the learning rate proportionally to the number of GPUs to compensate for the increase in the batch size.
  4. Distribute the optimizer.
  5. Ensure that the variables are correctly replicated on all the GPUs at the beginning of the training (and after initialization of the model).
  6. Save the checkpoints on the process of rank 0 only.

For example, the Horovod solution implemented in a TensorFlow 2 + Keras code is shown in the following script:

import tensorflow as tf
import horovod.tensorflow.keras as hvd
 
# Initialize Horovod
hvd.init()
 
# Pin GPU to be used to process local rank (one GPU per process)
gpus = tf.config.experimental.list_physical_devices('GPU')
for gpu in gpus:
    tf.config.experimental.set_memory_growth(gpu, True)
if gpus:
    tf.config.experimental.set_visible_devices(gpus[hvd.local_rank()], 'GPU')
 
# Build model and dataset
dataset = ...
model = ...
opt = tf.optimizers.Adam(0.001 * hvd.size())
 
# Horovod: add Horovod DistributedOptimizer.
opt = hvd.DistributedOptimizer(opt)
 
# Horovod: Specify `experimental_run_tf_function=False` to ensure TensorFlow
# uses hvd.DistributedOptimizer() to compute gradients.
mnist_model.compile(loss=tf.losses.SparseCategoricalCrossentropy(),
                    optimizer=opt,
                    metrics=['accuracy'],
                    experimental_run_tf_function=False)
callbacks = [
    # Horovod: broadcast initial variable states from rank 0 to all other processes.
    # This is necessary to ensure consistent initialization of all workers when
    # training is started with random weights or restored from a checkpoint.
    hvd.callbacks.BroadcastGlobalVariablesCallback(0),
]
 
# Horovod: save checkpoints only on worker 0 to prevent other workers from corrupting them.
if hvd.rank() == 0:
    callbacks.append(keras.callbacks.ModelCheckpoint('./checkpoint-{epoch}.h5'))
 
model.fit(dataset,
          steps_per_epoch=500 // hvd.size(),
          callbacks=callbacks,
          epochs=24,
          verbose=1 if hvd.rank() == 0 else 0)

Distributed validation

The validation step executed after each epoch, or after a set number of learning iterations, can be distributed on all the GPUs involved in the model learning. When data parallelism is used and the validation dataset is large, distributed validation on the GPUs seems to be the most efficient and the fastest solution.

The goal of distributed validation is to calculate the metrics (loss, accuracy, etc) per batch and per GPU, and then to weight and average them out for the entire validation dataset.

For this, it is necessary to:

  1. Load the validation dataset in the same way as for the training datasets but without the randomized transformations such as data augmentation or shuffling (see the documentation on database loading in TensorFlow):
    # validation dataset loading (imagenet for example)                  
    val_dataset = tf.data.TFRecordDataset(glob.glob(os.environ['DSDIR']+'/imagenet/validation*'))
    # define distributed sharding for validation
    val_dataset = val_dataset.shard(hvd.size(), hvd.rank())
    # define dataloader for validation
    val_dataset = val_dataset.map(tf_parse_eval,
          num_parallel_calls=idr_tf.cpus_per_task).batch(batch_size).prefetch(buffer_size=5)
  2. Switch from the “learning” mode to the “validation” mode to deactivate certain functionalities specific to the training which are costly and not needed here:
    • model(images, training=False) during the model evaluation, to switch the model into “validation” mode and deactivate management of the dropouts, the batchnorms, etc.
    • Without tf.GradientTape() to ignore the gradient calculation
  3. Evaluate the model and calculate the metric per batch in the usual way. (Here, we are taking the example of the calculation of loss; it will be the same for the other metrics.):
    • logits_ = model(images, training=False) followed by loss_value = loss(labels, logits_)
  4. Weight and average the metric(s) per GPU :
    • val_loss.update_state(loss_value, sample_weight=images.shape[0]) with images.shape[0]) as the batch size. Knowing that the batches are not necessarily the same size (last batch is sometimes smaller), it is preferable to use the images.shape[0] value here.
  5. Find the mean of the weighted metric averages of all the GPUs :
    • hvd.allreduce(val_loss.result()) to average the metric values calculated per GPU and communicate the result to all the GPUs. This operation effectuates inter-GPU communications. Horovod uses the allReduce mean operation by default.

Example after loading validation data :

loss = tf.losses.SparseCategoricalCrossentropy() # Define loss function
 
val_loss = tf.keras.metrics.Mean()               # Define Keras metrics
 
@tf.function                                     # Define validation step function
def eval_step(images, labels):
    logits_ = model(images, training=False)      # evaluate model - switch into validation mode     
    loss_value = loss(labels, logits_)           # compute loss
    val_loss.update_state(loss_value, sample_weight=images.shape[0]) # cumulate weighted mean per GP        
 
val_loss.reset_states()                          # initialize val_loss value
 
for val_images, val_labels in val_loader:
    eval_step(val_images, val_labels)            # Call eval_step function
 
val_loss_dist = hvd.allreduce(val_loss.result()) # Average weighted means and broadcast value to each GPU

Application example

An example is found on Jean Zay in the $DSDIR/examples_IA/Horovod_parallel/Example_DataParallelism_Horovod_Tensorflow-eng.ipynb using the MNIST data base and a simple dense network. The example is a Notebook which allows creating an execution script.

You can also download the notebook by clicking on this link.

This should be copied in your personal space (ideally in your $WORK).

$ cp $DSDIR/examples_IA/Horovod_parallel/Example_DataParallelism_Horovod_Tensorflow-eng.ipynb $WORK

You should then execute the Notebook from a Jean Zay front end __after loading a TensorFlow module (see our JupyterHub documentation for more information on how to run Jupyter Notebook).

Documentation and sources