Table des matières
Horovod: Multi-GPU and multi-node data parallelism
Horovod is a software unit which permits data parallelism for TensorFlow, Keras, PyTorch, and Apache MXNet. The objective of Horovod is to make the code efficient and easy to implement.
In examples from the AI community, Horovod is often used with Tensorflow to facilitate the implementation of data parallelism.
Horovod draws on the MPI and NCCL communication libraries, as well as the Intel Omni-Path (OPA) interconnection network, to exchange data between the devices (nodes, GPUs, CPUs).
We will document here the Horovod solutions inTensorFlow and in Pytorch on Jean Zay. This is a multi-process parallelism which functions equally well in mono-node and multi-node.
Multi-process configuration with SLURM
One of the advantages of Horovod is that no configuration manual is necessary in the description of the distributed environment. Horovod recovers the information relative to the GPUs, the available machines and the communication protocols, directly from the machine environment. Of particular note, the Horovod solution assures the portability of your code.
When you launch a script in SLURM with the srun
command, the script is automatically distributed on all the predefined tasks. For example, if we reserve 4 eight-GPU nodes and request 3 GPUs per node, we obtain:
- 4 nodes, indexed from 0 to 3.
- 3 GPUs/node indexed from 0 to 2 on each node.
- 4 x 3 = 12 processes in total, allowing the execution of 12 tasks with the ranks from 0 to 11.
The collective inter-node communications are managed by the NCCL library.
To execute a distributed code with Horovod in the SLURM environment, it is necessary to reserve one task per GPU implicated in the data parallelism.
Here are examples of SLURM script headings for effectuating:
- A reservation of N four-GPU nodes via the GPU partition by default:
#SBATCH --nodes=N # total number of nodes (N to be defined) #SBATCH --ntasks-per-node=4 # number of tasks per node (here 4, or 1 task per reserved GPU) #SBATCH --gres=gpu:4 # number of GPUs reserved per node (here 4, or all the GPUs of a node) #SBATCH --cpus-per-task=10 # number of cores per task (here, we reserve 4x10 = 40 cores per node) #SBATCH --hint=nomultithread # execute script srun python script.py
Comment : Here, the nodes are reserved exclusively because we are reserving the totality of the cores and GPUs. Of particular note, this gives access to the entire memory of each node. - A reservation of N eight-GPU nodes via the
gpu_p2
partition:#SBATCH --partition=gpu_p2 # must be specified to be used #SBATCH --nodes=N # total number of nodes (N to be defined) #SBATCH --ntasks-per-node=8 # number of tasks per node (here, 8 or 1 task per reserved GPU) #SBATCH --gres=gpu:8 # number of GPUs per node (here, 8 or all the GPUs of a node) #SBATCH --cpus-per-task=3 # number of cores per task (here, we reserve 8x3 = 24 cores per node) #SBATCH --hint=nomultithread # execute script srun python script.py
Comment : Here, the nodes are reserved exclusively because we are reserving the totality of the cores and GPUs. Of particular note, this gives access to the entire memory of each node.
Implementation of the Horovod solution
It is possible to use Horovod in the following software configurations:
- TensorFlow 2 and Keras - See the Horovod Keras documentation
- TensorFlow 2 alone - See the Horovod TensorFlow documentation
- Pytorch - See the Horovod Pytorch documentation
- TensorFlow 1 and Keras - See the Horovod Keras documentation
- TensorFlow 1 alone - See the Horovod TensorFlow documentation
We refer you to the Horovod documentation for the implementation details relative to each software configuration.
In each case, the code differs slightly but the development steps remain identical. It is necessary to:
- Import and initialize Horovod.
- Pin each GPU to a distinct process.
- Increase the learning rate proportionally to the number of GPUs to compensate for the increase in the batch size.
- Distribute the optimizer.
- Ensure that the variables are correctly replicated on all the GPUs at the beginning of the training (and after initialization of the model).
- Save the checkpoints on the process of rank 0 only.
For example, the Horovod solution implemented in a TensorFlow 2 + Keras code is shown in the following script:
import tensorflow as tf import horovod.tensorflow.keras as hvd # Initialize Horovod hvd.init() # Pin GPU to be used to process local rank (one GPU per process) gpus = tf.config.experimental.list_physical_devices('GPU') for gpu in gpus: tf.config.experimental.set_memory_growth(gpu, True) if gpus: tf.config.experimental.set_visible_devices(gpus[hvd.local_rank()], 'GPU') # Build model and dataset dataset = ... model = ... opt = tf.optimizers.Adam(0.001 * hvd.size()) # Horovod: add Horovod DistributedOptimizer. opt = hvd.DistributedOptimizer(opt) # Horovod: Specify `experimental_run_tf_function=False` to ensure TensorFlow # uses hvd.DistributedOptimizer() to compute gradients. mnist_model.compile(loss=tf.losses.SparseCategoricalCrossentropy(), optimizer=opt, metrics=['accuracy'], experimental_run_tf_function=False) callbacks = [ # Horovod: broadcast initial variable states from rank 0 to all other processes. # This is necessary to ensure consistent initialization of all workers when # training is started with random weights or restored from a checkpoint. hvd.callbacks.BroadcastGlobalVariablesCallback(0), ] # Horovod: save checkpoints only on worker 0 to prevent other workers from corrupting them. if hvd.rank() == 0: callbacks.append(keras.callbacks.ModelCheckpoint('./checkpoint-{epoch}.h5')) model.fit(dataset, steps_per_epoch=500 // hvd.size(), callbacks=callbacks, epochs=24, verbose=1 if hvd.rank() == 0 else 0)
Distributed validation
The validation step executed after each epoch, or after a set number of learning iterations, can be distributed on all the GPUs involved in the model learning. When data parallelism is used and the validation dataset is large, distributed validation on the GPUs seems to be the most efficient and the fastest solution.
The goal of distributed validation is to calculate the metrics (loss, accuracy, etc) per batch and per GPU, and then to weight and average them out for the entire validation dataset.
For this, it is necessary to:
- Load the validation dataset in the same way as for the training datasets but without the randomized transformations such as data augmentation or shuffling (see the documentation on database loading in TensorFlow):
# validation dataset loading (imagenet for example) val_dataset = tf.data.TFRecordDataset(glob.glob(os.environ['DSDIR']+'/imagenet/validation*')) # define distributed sharding for validation val_dataset = val_dataset.shard(hvd.size(), hvd.rank()) # define dataloader for validation val_dataset = val_dataset.map(tf_parse_eval, num_parallel_calls=idr_tf.cpus_per_task).batch(batch_size).prefetch(buffer_size=5)
- Switch from the “learning” mode to the “validation” mode to deactivate certain functionalities specific to the training which are costly and not needed here:
model(images, training=False)
during the model evaluation, to switch the model into “validation” mode and deactivate management of the dropouts, the batchnorms, etc.- Without
tf.GradientTape()
to ignore the gradient calculation
- Evaluate the model and calculate the metric per batch in the usual way. (Here, we are taking the example of the calculation of loss; it will be the same for the other metrics.):
logits_ = model(images, training=False)
followed byloss_value = loss(labels, logits_)
- Weight and average the metric(s) per GPU :
val_loss.update_state(loss_value, sample_weight=images.shape[0])
withimages.shape[0])
as the batch size. Knowing that the batches are not necessarily the same size (last batch is sometimes smaller), it is preferable to use theimages.shape[0]
value here.
- Find the mean of the weighted metric averages of all the GPUs :
hvd.allreduce(val_loss.result())
to average the metric values calculated per GPU and communicate the result to all the GPUs. This operation effectuates inter-GPU communications. Horovod uses the allReduce mean operation by default.
Example after loading validation data :
loss = tf.losses.SparseCategoricalCrossentropy() # Define loss function val_loss = tf.keras.metrics.Mean() # Define Keras metrics @tf.function # Define validation step function def eval_step(images, labels): logits_ = model(images, training=False) # evaluate model - switch into validation mode loss_value = loss(labels, logits_) # compute loss val_loss.update_state(loss_value, sample_weight=images.shape[0]) # cumulate weighted mean per GP val_loss.reset_states() # initialize val_loss value for val_images, val_labels in val_loader: eval_step(val_images, val_labels) # Call eval_step function val_loss_dist = hvd.allreduce(val_loss.result()) # Average weighted means and broadcast value to each GPU
Application example
An example is found on Jean Zay in the $DSDIR/examples_IA/Horovod_parallel/Example_DataParallelism_Horovod_Tensorflow-eng.ipynb
using the MNIST data base and a simple dense network. The example is a Notebook which allows creating an execution script.
You can also download the notebook by clicking on this link.
This should be copied in your personal space (ideally in your $WORK
).
$ cp $DSDIR/examples_IA/Horovod_parallel/Example_DataParallelism_Horovod_Tensorflow-eng.ipynb $WORK
You should then execute the Notebook from a Jean Zay front end __after loading a TensorFlow module (see our JupyterHub documentation for more information on how to run Jupyter Notebook).