TensorFlow : Chargement de bases de données pour l’apprentissage distribué d’un modèle

Dans cette page, nous mettons en pratique la gestion des Datasets et DataGenerators pour l'apprentissage distribué de modèle TensorFlow. Nous nous intéressons aux problématiques présentées dans la page mère sur le chargement des données.

Nous présentons ici l'usage :

La documentation se conclut sur la présentation d'un exemple complet de chargement optimisé de données, et sur une mise en pratique sur Jean Zay via un Jupyter Notebook.

Remarque préliminaire : dans cette documentation, nous nous limiterons à la release TensorFlow 2.x.

Datasets

En TensorFlow, la manipulation des bases de données est gérée par un objet de type tf.data.Dataset.

Datasets prédéfinis dans TensorFlow

TensorFlow fournit un ensemble de bases de données prêtes à l’emploi dans le module tensorflow-datasets. Certaines d’entre elles sont déjà disponibles dans l’espace commun $DSDIR de Jean Zay, dans le répertoire tensorflow_datasets.

Au chargement, il est possible de différencier les données dédiées à l’entraînement de celles dédiées à la validation, grâce à l’option split.

Par exemple, pour la base ImageNet2012 :

import tensorflow as tf
import tensorflow_datasets as tfds
 
# load imagenet2012 dataset from DSDIR
tfds_dir = os.environ['DSDIR'] + '/tensorflow_datasets/'
 
# load data for training
dataset = tfds.load('imagenet2012', 
                    data_dir=tfds_dir, 
                    split='train')
 
# load data for testing
dataset_test = tfds.load('imagenet2012', 
                    data_dir=tfds_dir, 
                    split='test')

Chaque fonction de chargement propose ensuite des fonctionnalités spécifiques aux bases de données (qualité des données, extraction d’une sous-partie des données, etc). Nous vous invitons à consulter la documentation officielle pour plus de détails.

Remarques :

  • les bases de données de la collection tensorflow-datasets sont au format TFRecord.
  • certaines fonctions proposent de télécharger les bases données en ligne grâce à l’argument download=True. Nous vous rappelons que les nœuds de calcul Jean Zay n’ont pas accès à internet et que de telles opérations doivent donc se faire en amont depuis une frontale ou un nœud de pré/post-traitement.
  • le répertoire $DSDIR/tensorflow_datasets peut être enrichi sur demande auprès de l’assistance IDRIS (assist@idris.fr).

Datasets personnalisés

Il existe différentes manières de générer un objet de type tf.data.Dataset à partir de la lecture de fichiers d’entrée. Les plus courantes sont :

  • tf.data.Dataset.list_files() pour la lecture de formats standards :
    import tensorflow as tf
     
    # locate Places365 dataset in DSDIR
    places365_dir = os.environ['DSDIR']+'/Places365-Standard/data_256/'
     
    # extract images from places beginning by ''a''
    dataset = tf.data.Dataset.list_files(places365_dir + 'a/*.jpg')

    Par défaut, la fonction list_files organise les fichiers de manière aléatoire. Pour désactiver cette fonctionnalité, vous pouvez spécifier l’argument shuffle=False.
    Remarque : la fonction list_files applique la fonction Python glob sur son paramètre d'entrée. Si vous possédez déjà la liste des chemins vers les fichiers d’entrée (grâce à un glob par exemple), il est préférable alors d’extraire les données avec la fonction tf.data.Dataset.from_tensor_slices(glob_list). Pour mélanger aléatoirement les fichiers, il faudra préalablement appliquer sur la liste des chemins une fonction comme random.Random(seed).shuffle(glob_list). Dans le cas d'un appel à la fonction .shard() (voir ci-après), la valeur de seed doit être la même sur chaque worker.

  • tf.data.TFRecordDataset() pour la lecture de fichiers au format TFRecord :
    import tensorflow as tf
     
    # locate imagenet dataset in DSDIR
    imagenet_path = os.environ['DSDIR']+'/imagenet/'
     
    # extract training data from dataset
    dataset = tf.data.TFRecordDataset(imagenet_path + 'train*'))

    Remarques :

    • Vous pouvez créer vous-même vos fichiers au format TFRecord à partir d'une base de données en suivant le tutoriel suivant. Le répertoire $DSDIR peut être aussi enrichi de Dataset au format TFRecord sur demande auprès de l’assistance IDRIS assist@idris.fr.
    • Il n'est pas possible de connaître la taille d'un Dataset chargé à partir d'un TFRecord avec la commande len(dataset) (cela retourne une erreur). Il faudra récupérer l'information à partir de la base de données source qui a permis de créer le TFRecord.
    • Si votre Dataset est partitionné en plusieurs fichiers TFRecord, il est conseillé d'utiliser la fonction interleave() lors de la lecture du Dataset. Les TFRecords partitionnés sont alors lus en parallèle, donc plus rapidement. Cela permet aussi d'éviter la redondance de lecture entre les différents processus lorsque l'on utilise le parallélisme de données.
      dataset = tf.data.Dataset.list_files(pattern)
      dataset = dataset.interleave(tf.data.TFRecordDataset,
                                   num_parallel_calls=tf.data.AUTOTUNE, deterministic=False)

      L’option num_parallel_calls=tf.data.AUTOTUNE permet en plus d’activer le multithreading. Le nombre de CPU sollicités est déterminé automatiquement en fonction des CPU disponibles. L’option deterministic=False permet d’améliorer les performances de traitement lorsque l’ordre des opérations n’est pas important.

Transformations

Fonction Map

Les données d’entrée peuvent être transformées grâce à la fonction tf.data.Dataset.map. La fonction de transformation est définie par l’utilisateur puis communiquée à la fonction map. Par exemple, pour redimensionner les images de la base de données ImageNet2012 :

# define resizing function
def img_resize(x) :
	return tf.image.resize(x,[300,300])
 
# apply transformation to dataset
dataset = dataset.map(img_resize, 
                      num_parallel_calls=tf.data.AUTOTUNE, 
                      deterministic=False)

Remarques :

  • l’option num_parallel_calls=tf.data.AUTOTUNE permet d’activer le multithreading pour l’étape de transformation. Le nombre de CPU sollicités est déterminé automatiquement en fonction des CPU disponibles.
  • l’option deterministic=False permet d’améliorer les performances de traitement lorsque l’ordre des opérations n’est pas important.

Fonction Cache

Certaines opérations de transformation sont déterministes (lecture des données, normalisation, redimensionnement, …), c’est-à-dire qu’elles génèrent la même donnée transformée à chaque fois, alors que d’autres sont liées à la Data Augmentation et sont aléatoires. Les transformations déterministes représentent en générale des traitements lourds générant des données de faible volume. Ces données transformées peuvent être stockées dans un cache mémoire afin de ne pas répéter la transformation inutilement à chaque époque. Pour cela, on utilise la fonction cache() :

# define random transformation
def random_trans(x) :
	return ...
 
# define deterministic transformation
def determ_trans(x) :
	return ...
 
# apply deterministic transformation to dataset and store them in cache
dataset = dataset.map(determ_trans, num_parallel_calls=tf.data.AUTOTUNE).cache()
 
# apply random transformation to dataset at each epoch
dataset = dataset.map(random_trans, num_parallel_calls=tf.data.AUTOTUNE)

Par défaut, la mémoire cache utilisée est la mémoire RAM du nœud CPU. Il est également possible de stocker les données transformées dans un fichier externe. Pour cela, il suffit de spécifier le chemin vers le fichier de stockage lors de l’appel à la fonction cache() : .cache(path/to/file).

Remarque:

  • la fonction cache() est utile si la mémoire RAM du CPU est capable d'accueillir toute la base de données. Le gain de temps avec un .cache(path/to/file) vers un fichier de stockage est limité sauf dans le cas extrême où le chargement des données et leurs transformations déterministes seraient un point de congestion important dans la boucle d'apprentissage.

Configuration et optimisation des étapes de prétraitement

Traitement aléatoire des données d’entrée

La fonction shuffle() permet d’activer le traitement de tri aléatoire des données, à chaque époque, lors de l'apprentissage. Cette fonction prend en entrée l’argument buffer_size qui fixe la taille du buffer mémoire utilisé pour cette opération. Idéalement, le buffer mémoire devrait pouvoir contenir l’ensemble de la base de données afin d’éviter tout biais dans le traitement aléatoire. En pratique, une base de données est souvent trop volumineuse et un buffer de taille réduite (1000 ou 10000 pointeurs) permet de s’affranchir de la contrainte mémoire.

Distribution des données sur plusieurs processus en vu d’un apprentissage distribué

Pour distribuer les données sur plusieurs processus (ou workers) en vu d’un apprentissage distribué, comme cela doit être fait avec Horovod, il faut utiliser la fonction shard(). Cette fonction prend en argument d’entrée le nombre de workers et l’indice global du worker. Ces valeurs peuvent être récupérées à partir de la configuration de calcul Slurm de la manière suivante :

import tensorflow as tf
import idr_tf # IDRIS package available in all TensorFlow modules
 
# load dataset
dataset = ...
 
# get number of processes/workers
num_workers = idr_tf.size    # or hvd.size() if Horovod is used
worker_index = idr_tf.rank   # or hvd.rank() if Horovod is used
 
# distribute dataset
dataset = dataset.shard(num_workers,worker_index)

On utilise ici la librairie idr_tf pour récupérer les informations relatives à l'environnement de calcul Slurm. Cette librairie développée par l’IDRIS est présente dans l’ensemble des modules TensorFlow sur Jean Zay. Si Horovod est utilisé, il est possible de récupérer les mêmes informations avec Horovod.

Si l'on utilise la stratégie de distribution tf.distribute.MultiWorkerMirroredStrategy, l' Autosharding est paramétré par défaut. Il ne faut donc pas utiliser le .shard(). Si vous désirez désactiver l'Autosharding, veuillez lire la documentation Tensorflow.

Remarques :

  • en appelant la fonction shard, chaque worker ne lit qu’une partie de la base de données ;
  • en configuration distribuée, il est important d’effectuer l’étape de shuffling après l’étape de distribution. Sinon, la base de données sera lue entièrement par chaque worker et les performances seront appauvries. De manière générale, la fonction shard doit être appliquée le plus tôt possible.

Optimisation de l’utilisation des ressources lors de l’apprentissage

La taille de batch est définie par la fonction batch(). Une taille de batch est optimale si elle permet une bonne utilisation des ressources de calcul, c’est-à-dire si la mémoire de chaque GPU est sollicitée au maximum et que la charge de travail est répartie équitablement entre les GPU. L'option drop_remainder=True permet d'ignorer le dernier batch si sa taille est inférieure à la taille de batch demandée. La fonction batch est toujours appelée après la fonction shuffle, afin d'obtenir des batches uniques à chaque époque.

Recouvrement transfert/calcul

Il est possible d’optimiser les transferts de batch du CPU vers le GPU en générant du recouvrement transfert/calcul. La fonctionnalité prefetch() permet de pré-charger les prochains batches à traiter pendant l’entraînement. La quantité de batches pré-chargés est contrôlée par l’argument buffer_size. Pour une définition automatique du buffer_size, il est possible de définir buffer_size=tf.data.AUTOTUNE.

Notes

  • La fonction .repeat() permet de répéter indéfiniment un Dataset dans un DataGenerator, ou de le répéter sur le nombre donné d'époques avec .repeat(num_epoch). Elle est optionnelle : l'absence de cette fonction correspond à un .repeat(1). Elle peut être utile si on souhaite paramétrer un nombre d'itérations d'apprentissage plutôt qu'un nombre d'époques. Elle est appelable n'importe où dans la chaîne, le résultat restera le même.
  • Si la base de données possède des données corrompues qui génèrent une erreur, il est possible d'ignorer les erreurs avec la commande dataset = dataset.apply(tf.data.experimental.ignore_errors()), à appliquer après le .map().

Exemple complet de chargement de données optimisé

Voici un exemple complet de chargement optimisé de la base de données ImageNet pour un apprentissage distribué sur Jean Zay :

import tensorflow as tf
import idr_tf # IDRIS package available in all TensorFlow modules
import os
import glob
import random
 
IMG_WIDTH=320
IMG_HEIGHT=320
def decode_img(file_path):
    # parse label
    label = tf.strings.split(file_path, sep='/')[-2]
    # read input file
    img = tf.io.read_file(file_path)
    # decode jpeg format (channel=3 for RGB, channel=0 for Grayscale)
    img = tf.image.decode_jpeg(img, channels=3)
    # convert to [0,1] format for TensorFlow compatibility
    img = tf.image.convert_image_dtype(img, tf.float32)
    # resize image
    return label, tf.image.resize(img, [IMG_WIDTH, IMG_HEIGHT])
 
# Create a generator
rng = tf.random.Generator.from_seed(123, alg='philox')
 
def randomized_preprocessing(label, img):
    # randomly adjust image contrast - Data Augmentation
    contrast_factor = random.random() + 1.0
    img = tf.image.adjust_contrast(img,contrast_factor=contrast_factor)
    img = tf.image.stateless_random_flip_left_right(img,rng.make_seeds(2)[0])
    return label, img
 
# configuration
num_epochs = 3
batch_size = 64
shuffling_buffer_size = 5000
num_parallel_calls = tf.data.AUTOTUNE
prefetch_factor = tf.data.experimental.AUTOTUNE
 
 
# locate Places365 dataset in DSDIR and list them
places365_path = glob.glob(os.environ['DSDIR']+"/Places365-Standard/data_256/**/*.jpg", recursive=True)
random.Random(123).shuffle(places365_path)
 
# create a dataset object from path
dataset = tf.data.Dataset.from_tensor_slices(places365_path)
# distribute dataset
num_workers = idr_tf.size
worker_index = idr_tf.rank
dataset = dataset.shard(num_workers,worker_index).shuffle(shuffling_buffer_size)
 
# deterministic transformation
dataset = dataset.map(decode_img, num_parallel_calls=num_parallel_calls, deterministic=False)
# cache function only relevant for small to medium datasets
dataset = dataset.cache()
# random transformations
dataset = dataset.map(randomized_preprocessing, num_parallel_calls=num_parallel_calls, deterministic=False)
 
dataset = dataset.batch(batch_size, drop_remainder=True).prefetch(prefetch_factor)
 
## Repeat dataset num_epochs times
#dataset = dataset.repeat(num_epochs)
#for label, img in dataset:
#    train_step(label, img)   
## equivalent to:  
for epoch in range(num_epochs):
    for label, img in dataset:
        train_step(label, img)

Puis voici un exemple de la chaîne de chargement lorsque l'on charge un fichier TFRecord unique :

dataset = tf.data.TFRecordDataset(input_file)
dataset = dataset.shard(num_workers, worker_index)
dataset = dataset.shuffle(shuffle_buffer_size)
dataset = dataset.map(parser_fn, num_parallel_calls=num_map_threads)
dataset = dataset.batch(batch_size, drop_remainder=True).prefetch(prefetch_factor)
dataset = dataset.repeat(num_epochs)

Enfin, l'exemple de la chaîne de chargement lorsque l'on charge un Dataset partitionné en plusieurs TFRecords :

dataset = Dataset.list_files(pattern)
dataset = dataset.shard(num_workers, worker_index)
dataset = dataset.shuffle(shuffle_buffer_size)
dataset = dataset.interleave(tf.data.TFRecordDataset,
                 cycle_length=num_readers, block_length=1)
dataset = dataset.map(parser_fn, num_parallel_calls=num_map_threads)
dataset = dataset.batch(batch_size, drop_remainder=True).prefetch(prefetch_factor)
dataset = dataset.repeat(num_epochs)

Mise en pratique sur Jean Zay

Pour mettre en pratique la documentation ci-dessus et vous faire une idée des gains apportés par chacune des fonctionnalités proposées par le Dataset TensorFlow, vous pouvez récupérer le Notebook Jupyter notebook_data_preprocessing_tensorflow.ipynb dans le DSDIR. Par exemple, pour le récupérer dans votre WORK :

$ cp $DSDIR/examples_IA/Tensorflow_parallel/notebook_data_preprocessing_tensorflow.ipynb $WORK

Vous pouvez également télécharger le Notebook ici.

Vous pouvez ensuite ouvrir et exécuter le Notebook depuis notre service jupyterhub. Pour l'utilisation de jupyterhub, vous pouvez consulter les documentations dédiées : Jean Zay : Accès à JupyterHub et documentation JupyterHub Jean-Zay

La création de fichiers TFRecord et leur utilisation, y sont aussi abordées.

Documentation officielle