TensorFlow: Database loading for distributed learning of a model

In this page, we describe the management of Datasets and DataGenerators for the TensorFlow distributed learning model. We are focusing here on issues introduced in the main page on data loading.

On this page, we describe the usage of:

The page concludes with the presentation of a complete example of the loading of optimised data and implementation on Jean Zay via a Jupyter Notebook.

Preliminary comment : This documentation is limited to theTensorFlow 2.x release.

Datasets

In TensorFlow, the database manipulation is managed by a tf.data.Dataset type object.

Pre-defined datasets in TensorFlow

TensorFlow offers a group of databases which are ready to use in the tensorflow-datasets module. Some of them are already available in the Jean Zay $DSDIR common space in the tensorflow_datasets directory. At the loading, it is possible to differentiate the data dedicated to the training from data dedicated to the validation by using the split option. For example, for the ImageNet2012 database:

import tensorflow as tf
import tensorflow_datasets as tfds
# load imagenet2012 dataset from DSDIR
tfds_dir = os.environ['DSDIR'] + '/tensorflow_datasets/'
# load data for training
dataset = tfds.load('imagenet2012',
                    data_dir=tfds_dir,
                    split='train') 
# load data for testing
dataset_test = tfds.load('imagenet2012',
                    data_dir=tfds_dir,
                    split='test')

Each loading function then proposes functionalities specific to the databases (data quality, partial data extraction, etc). For more information, please consult the official documentation. Comments :

  • The tensorflow-datasets database collection is in the TFRecord format.
  • Certain functions propose downloading the databases on line using the download=True argument. We remind you that the Jean Zay compute nodes do not have access to internet and such operations must be done from a front end or a pre-/post-processing node.
  • The $DSDIR/tensorflow_datasets directory can be augmented upon request to the IDRIS support team (assist@idris.fr).

Customised datasets

There are different ways to generate a tf.data.Dataset type object from the reading of input files. The most commonly used are:

  • tf.data.Dataset.list_files() for reading standard formats:
    import tensorflow as tf
    # locate Places365 dataset in DSDIR
    places365_dir = os.environ['DSDIR']+'/Places365-Standard/data_256/'
    # extract images from places beginning by ''a''
    dataset = tf.data.Dataset.list_files(places365_dir + 'a/*.jpg')

    By default, the list_files function organises the files randomly. To deactivate this functionality, you can specify the shuffle=False argument.

Comment: The list_files function applies the Python glob function to its input parameter. If you already have the list of paths to the input files (because of a glob, for example), it would be preferable to extract the data from the tf.data.Dataset.from_tensor_slices(glob_list) function. For a randomized mixing of the files, it is necessary to apply a function to the list of paths beforehand, such as random.Random(seed).shuffle(glob_list). If there is a call to the .shard() function (see below), the value of « seed» must be the same on each worker.

  • tf.data.TFRecordDataset() for reading files in the TFRecord format:
    import tensorflow as tf
    # locate imagenet dataset in DSDIR
    imagenet_path = os.environ['DSDIR']+'/imagenet/'
    # extract training data from dataset
    dataset = tf.data.TFRecordDataset(imagenet_path + 'train*'))

    Comments :

    • You can create your files in the TFRecord format yourself from a database by following this tutorial. The $DSDIR directory can also be enriched with Datasets in the TFRecord format upon request to the IDRIS support team assist@idris.fr.
    • It is not possible to know the size of a Dataset loaded from a TFRecord by using the len(dataset) command (this returns an error). It is necessary to recover the information from the database source which enabled the creation of the TFRecord.
    • If your Dataset is partitionned into a number of TFRecords, it is advised to use the interleave() function during the reading of the Dataset. The partitionned TFRecords are then read concurrently, thus more rapidly. This also allows us to avoid read redundancy between the different processes when using data parallelism.
      dataset = tf.data.Dataset.list_files(pattern)
      dataset = dataset.interleave(tf.data.TFRecordDataset,
                                   num_parallel_calls=tf.data.AUTOTUNE, deterministic=False)

      Moreover, the num_parallel_calls=tf.data.AUTOTUNE option enables the activation of multithreading. The number of CPUs solicited is determined automatically in function of the available CPUs. The deterministic=False option improves the processing performance when the order of operations is not important.

Transformations

The map function

Input data can be transformed by using the tf.data.Dataset.map function. The transformation function is defined by the user and then communicated to the map function. For example, to resize the images of the ImageNet2012 database:

# define resizing function
def img_resize(x) :
	return tf.image.resize(x,[300,300])
# apply transformation to dataset
dataset = dataset.map(img_resize,
                      num_parallel_calls=tf.data.AUTOTUNE,
                      deterministic=False)

Comments :

  • The num_parallel_calls=tf.data.AUTOTUNE option enables the activation of multithreading for the transformation step. The number of CPUs solicited is determined automatically in function of the available CPUs.
  • The deterministic=False option enables the improvement of processing performance when the order of operations is not important.

The cache function

Certain transformation operations are deterministic (data reading, normalization, resizing, …): That is, they generate the same transformed data each time, whereas others are linked to Data Augmentation and are randomized. Deterministic transformations generally represent heavy processing generating low-volume data. These transformed data can be stored in a memory cache so that the transformation is not unnecessarily repeated at each epoque. For this, we use the cache() function:

# define random transformation
def random_trans(x) :
	return ...
# define deterministic transformation
def determ_trans(x) :
	return ...
# apply deterministic transformation to dataset and store them in cache
dataset = dataset.map(determ_trans, num_parallel_calls=tf.data.AUTOTUNE).cache()
# apply random transformation to dataset at each epoch
dataset = dataset.map(random_trans, num_parallel_calls=tf.data.AUTOTUNE)

By default, the cache memory used is the RAM memory of the CPU node. It is also possible to store the transformed data in an external file. For this, you just need to specify the path to the storage file during the cache() function call: .cache(path/to/file). Comment:

  • The cache() function is useful if the RAM memory of the CPU is capable of receiving all of the database. The time gain with a .cache(path/to/file) to a storage file is limited, except in the extreme case where the data loading and their deterministic transformations would be an important congestion point in the learning loop.

Configuration and optimisation of the preprocessing steps

Random processing of input data

The shuffle() function enables activation of the process of random sorting of data at each epoch during the learning. This function takes the buffer_size as input argument which sets the buffer memory size used for this operation. Ideally, the memory buffer should be able to contain all of the database in order to avoid any bias in the randomization process. In practice, a database is often too large and a buffer of reduced size (1 000 or 10 000 pointers) enables memory economisation.

Data distribution on more than one process for a distributed learning

To distribute data on more than one process (or worker) for a distributed learning, as must be done with Horovod, you need to use the shard() function. This function takes the number of workers and the global index of the worker as input argument. These values can be recovered from the Slurm computing configuration as follows:

import tensorflow as tf
import idr_tf # IDRIS package available in all TensorFlow modules
 
# load dataset
dataset = ...
 
# get number of processes/workers
num_workers = idr_tf.size    # or hvd.size() if Horovod is used
worker_index = idr_tf.rank   # or hvd.rank() if Horovod is used
 
# distribute dataset
dataset = dataset.shard(num_workers,worker_index)

Here we are using the idr_tf library to recover the information relative to the Slurm computing environment. This library, developed by IDRIS, is present in all of the TensorFlow modules on Jean Zay. If Horovod is used, it is possible to get the same information with Horovod.

If you use the tf.distribute.MultiWorkerMirroredStrategy distribution strategy, Autosharding is set by default. You should therefore not use the .shard(). If you would like to disable Autosharding, please read theTensorflow documentation.

Comments :

  • In calling the shard function, each worker only reads a part of the database.
  • In distributed configuration, it is important to effectuate the shuffling step after the distribution step. If not, the database will be entirely read by each worker and the performance will be poorer. In general, the shard function must be applied as soon as possible.

Optimisation of resource usage during the learning

The batch size is defined by the batch() function. A batch size is optimal if it enables a good usage of the computing ressources; that is, if the memory of each GPU is maximally solicited and the work load is divided equitably between the GPUs. The drop_remainder=True option allows ignoring the last batch if its size is inferior to the requested batch size. The batch function is always called after the shuffle function in order to obtain a global batch at each epoque.

Transfer/computation overlapping

It is possible to optimise the batch transfers from CPU to GPU by generating transfer/computation overlapping. The prefetch() functionality enables pre-loading the next batches to be processed during the training. The quantity of pre-loaded batches is controlled by the buffer_size argument. For an automatic definition of the buffer_size, it is possible to specify buffer_size=tf.data.AUTOTUNE.

Notes

  • The .repeat() function enables repeating a Dataset indefinitely in a Data Generator, or to repeat it on the given number of epochs with .repeat(num_epoch); it is optional. The absence of this function corresponds to a .repeat(1). It can be useful if we wish to parameterize a number of learning iterations rathernthan a number of epochs. It can be called anywhere in the chain and the result will be the same.
  • If the database contains corrupted data which generate an error, it is possible to ignore the errors by using the dataset = dataset.apply(tf.data.experimental.ignore_errors()) command, applied after the .map().

Complete example of optimised data loading

The following is a complete example of optimised loading of the ImageNet database for distributed learning on Jean Zay :

import tensorflow as tf
import idr_tf # IDRIS package available in all TensorFlow modules
import os
import glob
import random
 
IMG_WIDTH=320
IMG_HEIGHT=320
def decode_img(file_path):
    # parse label
    label = tf.strings.split(file_path, sep='/')[-2]
    # read input file
    img = tf.io.read_file(file_path)
    # decode jpeg format (channel=3 for RGB, channel=0 for Grayscale)
    img = tf.image.decode_jpeg(img, channels=3)
    # convert to [0,1] format for TensorFlow compatibility
    img = tf.image.convert_image_dtype(img, tf.float32)
    # resize image
    return label, tf.image.resize(img, [IMG_WIDTH, IMG_HEIGHT])
# Create a generator
rng = tf.random.Generator.from_seed(123, alg='philox')
def randomized_preprocessing(label, img):
    # randomly adjust image contrast - Data Augmentation
    contrast_factor = random.random() + 1.0
    img = tf.image.adjust_contrast(img,contrast_factor=contrast_factor)
    img = tf.image.stateless_random_flip_left_right(img,rng.make_seeds(2)[0])
    return label, img
# configuration
num_epochs = 3
batch_size = 64
shuffling_buffer_size = 5000
num_parallel_calls = tf.data.AUTOTUNE
prefetch_factor = tf.data.experimental.AUTOTUNE
# locate Places365 dataset in DSDIR and list them
places365_path = glob.glob(os.environ['DSDIR']+"/Places365-Standard/data_256/**/*.jpg", recursive=True)
random.Random(123).shuffle(places365_path)
# create a dataset object from path
dataset = tf.data.Dataset.from_tensor_slices(places365_path)
# distribute dataset
num_workers = idr_tf.size
worker_index = idr_tf.rank
dataset = dataset.shard(num_workers,worker_index).shuffle(shuffling_buffer_size)
# deterministic transformation
dataset = dataset.map(decode_img, num_parallel_calls=num_parallel_calls, deterministic=False)
# cache function only relevant for small to medium datasets
dataset = dataset.cache()
# random transformations
dataset = dataset.map(randomized_preprocessing, num_parallel_calls=num_parallel_calls, deterministic=False)
    dataset = dataset.batch(batch_size, drop_remainder=True).prefetch(prefetch_factor)
## Repeat dataset num_epochs times
#dataset = dataset.repeat(num_epochs)
#for label, img in dataset:
#    train_step(label, img)   
## equivalent to:  
for epoch in range(num_epochs):
    for label, img in dataset:
        train_step(label, img)

The following is an example of the loading chain when loading a global TFRecord file:

dataset = tf.data.TFRecordDataset(input_file)
dataset = dataset.shard(num_workers, worker_index)
dataset = dataset.shuffle(shuffle_buffer_size)
dataset = dataset.map(parser_fn, num_parallel_calls=num_map_threads)
dataset = dataset.batch(batch_size, drop_remainder=True).prefetch(prefetch_factor)
dataset = dataset.repeat(num_epochs)

Lastly, here is an example of the loading chain when loading a Dataset partitionned into a number of TFRecords:

dataset = Dataset.list_files(pattern)
dataset = dataset.shard(num_workers, worker_index)
dataset = dataset.shuffle(shuffle_buffer_size)
dataset = dataset.interleave(tf.data.TFRecordDataset,
                 cycle_length=num_readers, block_length=1)
dataset = dataset.map(parser_fn, num_parallel_calls=num_map_threads)
dataset = dataset.batch(batch_size, drop_remainder=True).prefetch(prefetch_factor)
dataset = dataset.repeat(num_epochs)

Implementation on Jean Zay

In order to implement the above documentation and get an idea of the gains brought by each of the functionalities offered by the TensorFlow datase, you can recover the Jupyter Notebook, notebook_data_preprocessing_tensorflow-eng.ipynb, in the DSDIR. For example, to recover it in your WORK:

$ cp $DSDIR/examples_IA/Tensorflow_parallel/notebook_data_preprocessing_tensorflow-eng.ipynb $WORK

You can also download the Notebook here.

Then you can open and execute the Notebook from the IDRIS jupyterhub platform. To use jupyterhub, please refer to the corresponding documentations: Jean Zay: Access to JupyterHub and https://jupyterhub.idris.fr/services/documentation/.

Creation of TFRecord files and their usage are also addressed here.

Official documentation