Big Data : premiers pas avec MapReduce, brique centrale d'Hadoop

Big Data : premiers pas avec MapReduce, brique centrale d'Hadoop Le modèle MapReduce est conçu pour lire, traiter et écrire des volumes massifs de données. Des bonnes feuilles issues de l'ouvrage Big Data chez Eni.

MapReduce est un modèle de programmation conçu spécifiquement pour lire, traiter et écrire des volumes de données très importants. Un programme Hadoop met généralement en œuvre à la fois des tâches de type map et des tâches de type reduce.

MapReduce implémente les fonctionnalités suivantes :

 Parallélisation automatique des programmes Hadoop.

 Gestion transparente du mode distribué.

 Tolérance aux pannes.

Plus généralement, MapReduce simplifie grandement la vie du développeur Hadoop, en lui masquant une bonne partie du fonctionnement interne de Hadoop.

Ainsi, lorsqu'il écrit le code du mapper (cf. section suivante), le développeur Hadoop travaille comme s'il ne devait traiter qu'un seul enregistrement. Le passage d'un enregistrement à un autre, ainsi que la détection de la fin du fichier sont pris en charge par Hadoop.

3. Principes de fonctionnement de MapReduce

Un programme Hadoop se divise généralement en trois parties :

 Le driver, qui s'exécute sur une machine client, est chargé de configurer le job puis de le soumettre pour exécution.

 Le mapper est chargé de lire les données stockées sur disque et les traiter.

 Le reducer est chargé de consolider les résultats issus du mapper puis de les écrire sur disque.

4. MapReduce du point de vue du développeur Java

Bien que les exemples de cette section soient principalement issus de programmes écrits en Java, les principes de fonctionnement de MapReduce restent valables quel que soit le langage de programmation utilisé.

4.1 Les entrées-sorties

Dans MapReduce, les données sont toujours lues ou écrites selon le format <key, value> (<clef, valeur>). Cette façon de procéder peut paraître réductrice, mais en fait :

 On s'aperçoit à l'usage qu'il est possible, avec un peu d'expérience et d'astuce, de traiter beaucoup de problèmes de cette manière, y compris, par exemple, la recherche du plus court chemin dans un graphe orienté acyclique.

 Le fait d'imposer une structure unique et simple aux enregistrements, tant en lecture qu'en écriture, contribue à l'efficacité d'Hadoop au niveau des entrées-sorties.

Les données lues par le mapper sont définies au niveau du driver. La définition des données comprend :

 Leur localisation (fichier ou répertoire).

 Le type des enregistrements, qui est défini par la classe InputFormat.

 La détermination de la taille des InputSplits : un InputSplit définit le volume des données qui seront lues à chaque opération de lecture :
 - L'InputSplit a le plus souvent la taille d'un bloc HDFS, mais ce n'est pas une obligation.
 - Il est du ressort de Hadoop de faire le lien entre enregistrements et InputSplits d'une part, et entre InputSplits et blocs HDFS d'autre part.
 - Il n'y a en effet aucune garantie que la fin d'un bloc corresponde à la fin d'un enregistrement et, le plus souvent, le dernier enregistrement d'un bloc est tronqué.

Hadoop prend en compte par défaut les types d'enregistrement suivants :

 TextInputFormat :
 - Type par défaut.
 - value est une ligne entière terminée par \n.
 - key est l'offset de la ligne depuis le début de fichier (elle est souvent sans intérêt pour le problème à traiter et, dans ce cas, il suffit de l'ignorer).

 KeyValueTextInputFormat :
 - Chaque ligne est supposée être au format <key><separator><value>\n.
 - Le separator par défaut est tab.

 SequenceFileInputFormat :
 - Permet de lire un fichier binaire de paires <key, value>, comprenant éventuellement des métadonnées.

 SequenceFileAsTextInputFormat :
 - Format identique au précédent mais, en plus, convertit les clefs et les valeurs en strings (<key.toString(), value.toString()>).

Dans Hadoop :

 Les clefs sont des objets qui implémentent l'interface ComparableWritable.

 Les valeurs sont des objets qui implémentent l'interface Writable.

L'utilisation de l'interface Writable permet d'optimiser le processus de sérialisation lors des accès disque. Tout type de données dans Hadoop doit implémenter Writable.

Par défaut, Hadoop propose les types de données suivants :

 IntWritable (int en Java).

 LongWritable (long en Java).

 FloatWritable (float en Java).

 DoubleWritable (double en Java).

 Text (string en Java).

 etc.

4.2 La phase map (exemple 1)

Le programme WordCount utilisé dans ce paragraphe pour expliquer le fonctionnement d'un mapper sera également utilisé pour expliquer le fonctionnement d'un reducer.

Le programme WordCount est écrit en pseudo-code et est simplifié à l'extrême. Dans un premier temps, on fait l'hypothèse d'un cluster Hadoop disposant seulement d'un mapper et d'un reducer. En outre, le fichier en entrée ne comporte que quatre enregistrements.

Une version plus réaliste de WordCount sera déclinée de manière plus détaillée dans différents langages de programmation au chapitre Développer des programmes Hadoop). Il s'agit de concevoir un programme Hadoop comptant le nombre d'occurrences des différents mots composant un livre.

Voici le contenu du fichier (livre) en entrée :

am stram gram
pic et pic et colégram
bour et bour et ratatam
am stram gram


Un enregistrement correspond à une ligne terminée par \n. Dans un véritable programme Hadoop, on utilisera le format TextInputFormat pour lire les enregistrements.

Voici le mapper du programme WordCount :

1.function map(LongWritable inKey1, String inValue1)
2. foreach word w in inValue1:
3. write(w, 1)

Ligne 1 : le mapper lit en entrée un enregistrement qui se présente sous la forme d'un couple <key, value> avec :

 value du type String (c'est une ligne du fichier).

 key du type LongWritable (c'est l'offset de la ligne depuis le début du fichier).

Ligne 2 : pour chaque mot w de la ligne en cours...

Ligne 3 : ...on écrit dans un fichier en sortie le couple <w, 1>, 1 correspondant à une occurrence du mot contenu dans la variable w.

Le fichier en sortie de mapper sera donc le suivant :

am,1
stram,1
gram,1
pic,1
et,1
pic,1
et,1
colégram,1
bour,1
et,1
bour,1
et,1
ratatam,1
am,1
stram,1
gram,1


On notera qu'en sortie de mapper :

 Les résultats, dits intermédiaires, sont aussi stockés sous forme de <key, value>, avec key du type Text et value du type intWritable.

 La valeur en entrée (un mot) est devenue une clef en sortie.

 La clef en entrée de mapper (offset de la ligne) ne joue aucun rôle dans le programme.
 

4.3 Entre la phase map et la phase reduce (exemple 1)

Avant d'être envoyé au reducer, le fichier est automatiquement trié par clef par Hadoop : c'est ce que l'on appelle la phase de "shuffle & sort". Le fichier en entrée du reducer est le suivant :

am,[1,1]
bour,[1,1]
colégram,[1]
et,[1,1,1,1]
gram,[1,1]
pic,[1,1]
ratatam,[1]
stram,[1,1]


4.4 La phase reduce (exemple 1)

Voici le reducer du programme Hadoop permettant de consolider les résultats issus du mapper :

1.function reduce(Text inKey2, Iterator<intWritable> inValue2)
2. set wordCount = 0
3. foreach v in inValue2:
4. wordCount = wordCount + v
5. write(inKey2, wordCount)


Ligne 1 : le reducer prend en entrée un enregistrement sous la forme d'un couple <key, value> avec :

 key du type Text (c'est un mot).

 value étant une liste de valeurs du type intWritable.

Ligne 2 : le reducer remet à zéro le compteur wordCount lorsque l'on change de mot (rappelons que le changement de mot correspond à un changement de clef, et donc d'enregistrement, et qu'il est géré par Hadoop).

Ligne 3 : pour chaque valeur v dans la liste inValue2 on ajoute v à word-Count (dans notre exemple v vaut toujours 1).

ligne 4 : quand on change de mot, on écrit dans un fichier en sortie le couple <inKey2, wordCount>, wordCount étant le nombre d'occurrences du mot contenu dans la variable inKey2.

Le fichier en sortie de reducer sera donc le suivant :

am,2
bour,2
colégram,1
et,4
gram,2
pic,2
ratatam,1
stram,2


Nous venons d'écrire notre premier programme Hadoop, permettant de compter le nombre d'occurrences des différents mots composant un livre !

Il faut néanmoins reconnaître qu'avec un seul mapper et un seul reducer les performances de notre programme ne seront pas meilleures que celle d'un programme classique s'exécutant sur une seule machine.

Pour tirer parti des spécificités de Hadoop, nous allons faire évoluer notre cluster en le configurant pour qu'il dispose de :

 Quatre mappers.

 Deux reducers.

Voyons ce qui se passe maintenant lorsque l'on lance le programme dans le nouveau cluster (ce qu'il y a de bien avec Hadoop, c'est qu'un programme qui fonctionne avec un mapper et un reducer fonctionnera aussi avec quatre mappers et deux reducers, ou avec cent mappers et cinquante reducers, etc. : il n'y a d'ailleurs aucune modification à apporter à notre pseudo-code).

4.5 La phase map (exemple 2)

Chacun des quatre mappers va travailler sur une partie du fichier en entrée, par exemple :

 Le mapper n° 1 va traiter la ligne 1.

 Le mapper n° 2 va traiter la ligne 2.

 Le mapper n° 3 va traiter la ligne 3.

 Le mapper n° 4 va traiter la ligne 4.

Chaque mapper va produire un fichier de résultats intermédiaires :

mapreduce
La phase de map. © Eni

4.6 Entre la phase map et la phase reduce (exemple 2)

Avant le transfert des résultats intermédiaires des mappers vers les reducers :

 Les enregistrements sont triés par clef.

 Tous les enregistrements correspondant à une même clef sont envoyés vers un seul et même reducer.

Hadoop garantit :

 Que si un reducer reçoit le couple am,1 du mapper1, alors il recevra aussi le couple am,1 du mapper4.

 Que tous les enregistrements correspondant à la clef am seront regroupés et envoyés au même reducer.

On est ainsi sûr que le comptage sera exact.

Les fichiers en entrée des reducers seront par exemple :

mapreduce 2
Les fichiers en entrée des reducers © Eni

4.7 La phase reduce (exemple 2)

Les fichier en sortie des reducers seront alors les suivants :

mapreduce 3
Les fichier en sortie des reducers. © Eni

Il ne reste plus qu'à fusionner les fichiers issus des deux reducers pour obtenir le résultat cherché.

Par rapport au premier cluster (un mapper et un reducer), ce nouveau cluster (quatre mappers et deux reducers) permet :

 De diviser par un facteur de l'ordre de quatre le temps d'exécution de la phase map.

 De diviser par un facteur de l'ordre de deux le temps d'exécution de la phase reduce.

Le gain peut paraître dérisoire dans notre exemple dont l'intérêt est avant tout pédagogique mais, dans la réalité, Hadoop a permis à des entreprises de diviser par un facteur 10 les temps d'exécution de certains travaux informatiques. Une telle amélioration peut être vitale lorsque les travaux concernés doivent tourner tous les jours, et que leur temps d'exécution dans un environnement informatique classique avoisine les 24 heures.

4.8 Quelques remarques générales concernant les mappers et reducers

Un job Hadoop comprend au minimum un mapper : il peut ne pas comprendre de phase reduce.

Durant la phase de shuffle & sort, tous les mappers sont susceptibles d'envoyer des données vers tous les reducers. Cette phase pourrait constituer un goulot d'étranglement en saturant le réseau : des mécanismes au sein de Hadoop permettent d'éviter qu'une telle situation se produise :

 Chaque reducer fournit une partie du résultat global : si l'on souhaite disposer d'un seul fichier regroupant l'ensemble des résultats en sortie d'un job Hadoop, il faut paramétrer le job avec un seul reducer. Attention cependant à l'impact d'un tel choix sur le niveau de performance du job.

 Il n'existe aucune garantie quant à l'ordre de traitement des données d'un job Hadoop. Une conséquence de cette situation est que Hadoop ne peut pas traiter n'importe quel problème de manière efficace :

 - Rechercher un maximum ou un minimum dans un ensemble de données,
compter des données... sont des exemples de problèmes qu'il est aisé de
traiter avec Hadoop.

 - Calculer la moyenne d'un ensemble de données est un problème qu'il est
plus malaisé de traiter avec Hadoop, car le calcul de la moyenne n'est pas
une opération associative : si l'on divise l'ensemble des données en plusieurs
parties – chacune correspondant par exemple à un mapper – la
moyenne de l'ensemble ne sera généralement pas égale à la moyenne des
moyennes des parties.

Cet article est constitué à partir de bonnes feuilles extraites de l'ouvrage "Big Data - Concepts et mise en œuvre de Hadoop" de Laurent Jolia-Ferrier publié aux Editions Eni.