Hadoop, une introduction - 2/3
Par Kadda SAHNINEPublié le | novembre 10, 2009 |
Après quelques présentations liminaires du projet Hadoop et du modèle MapReduce, nous aborderons dans ce second article les questions liées à l’architecture du framework ainsi que les aspects pratiques du développement. L’application de comptage de mots présenté dans le premier volet de la série constitue le fil conducteur de la série. La lecture de ce billet est donc un préalable.
Un peu de terminologie
Un job MapReduce, ou job pour faire court, est une unité de traitement mettant en œuvre un jeu de données en entrée, un programme MapReduce (packagé dans un jar) et des éléments de configuration.
L’exécution d’un job MapReduce suppose donc un partage de responsabilité entre éléments de développement (jeu de données, programme MapReduce, fichiers de configuration Hadoop) et éléments d’infrastructure (Hadoop), ce qui peut se décomposer ainsi :
- Configuration du job (définie par l’utilisateur)
- Découpage du jeu de données et distribution sur le cluster (géré par Hadoop)
- Démarrage de chaque tâche map avec son propre jeu de données issu du découpage (géré par Hadoop)
- Exécution en parallèle de chaque fonction map (implémenté par l’utilisateur)
- Les sorties des fonctions map sont triées par clé pour former de nouvelles unités de données (géré par Hadoop)
- Démarrage des tâches reduce avec son propre jeu de données issu du tri (géré par Hadoop)
- Exécution en parallèle de chaque fonction reduce (implémenté par l’utilisateur)
- Assemblage du résultat des opérations reduce et stockage (géré par Hadoop)
Typologie d’un cluster Hadoop
Comme indiqué en introduction, Hadoop n’est pas un projet monolithique mais est constitué – dans son acception la plus courante – des composants Hadoop Common, MapReduce et HDFS.
Un cluster Hadoop est typiquement organisé en configuration maître/esclaves et fait cohabiter les agents (ou services) suivants :
- Un NameNode (exécuté sur la machine maître) et plusieurs DataNode (exécutés sur les machines esclaves)
- Un JobTracker (exécuté sur la machine maître) et plusieurs TaskManager (exécutés sur les machines esclaves)

Analysons plus en détail le rôle de chacun de ces services.
HDFS : le système de fichier distribué de Hadoop
Hadoop est conçu pour traiter des quantités gigantesques de données. C’est pourquoi l’utilité du système de fichier distribué HDFS fait sens, en particulier lorsque le jeu de données à traiter dépasse la capacité de stockage d’une seule machine. Sous HDFS, les fichiers sont partitionnés par blocs de 64 MB par défaut.
Un cluster HDFS repose sur deux types de composants majeurs, le NameNode et le DataNode :
- Le NameNode est la véritable pierre angulaire du système de fichier. Il gère l’espace de nommage et l’arborescence du système de fichiers, les métadonnées (noms, permissions, etc.) des fichiers et répertoires. Il centralise la localisation des blocs de données répartis sur le système. Sans Namenode, tous les fichiers peuvent être considérés comme perdus car il n’y aurait alors aucun moyen de reconstituer les fichiers à partir des blocs.
Il n’y a qu’une instance de NameNode par cluster HDFS. L’historique des modifications dans le système de fichier est géré par une instance secondaire cohabitant en backup. - Les DataNodes stockent et restituent les blocs de données. Par ailleurs, ils communiquent périodiquement au NameNode la liste des blocs qu’ils hébergent. L’écriture d’un bloc sur un DataNode peut être propagée en cascade par copie sur d’autres DataNodes.
Le processus de lecture d’un fichier sur HDFS commence par l’interrogation du NameNode afin de localiser les blocs sous-jacents. Pour chaque bloc, le NameNode renvoie l’adresse du DataNode le plus proche possédant une copie du bloc. L’unité de distance n’est autre que la bande passante disponible. Ainsi, plus la bande passante est importante entre un client et un DataNode, plus ce dernier est considéré comme proche.
Deux autres types de composants permettent de contrôler le processus d’exécution d’un job : un JobTracker et plusieurs TaskTrackers.
JobTracker et TaskTrackers
- Le JobTracker coordonne l’exécution des jobs sur l’ensemble du cluster. Il communique avec les TaskTrackers en leur attribuant des tâches d’exécution (map ou reduce). Dans le cas d’utilisation (théorique) présenté dans le premier article de la série, le JobTracker distribuerait 3 tâches map et 5 tâches reduce.
Par ailleurs, il permet d’avoir une vision globale sur la progression ou l’état du traitement distribué via une console d’administration web accessible par défaut sur le port 50030.
Le JobTracker est un démon cohabitant avec le NameNode. Il n’y a donc qu’une instance par cluster. - Les TaskTrackers exécutent les tâches (map ou reduce) au sein d’une nouvelle JVM instantiée par le TaskTracker. Ainsi, un crash de la machine virtuelle n’impactera pas le TaskTracker.
Par ailleurs, ils notifient périodiquement le JobTracker du niveau de progression d’une tâche ou bien le notifient en cas d’erreur afin que celui-ci puissent reprogrammer et assigner une nouvelle tâche.
Un TaskTracker est un démon cohabitant avec un DataNode. Il y a donc autant d’instances que de nœuds esclaves.
La communication entre les nœuds (NameNode/DataNode, JobTracker/TaskTracker) s’effectue par RPC.
Éléments d’architecture physique
Un cluster Hadoop peut être constitué d’un ensemble de machines hétérogènes, aussi bien du point de vue matériel que du point de vue logiciel (système d’exploitation) mais un cluster de machines homogènes est plus aisé à administrer.
Un dispositif matériel organisé en racks de 30 à 40 serveurs est typique de ce type d’architecture :

Développement d’une application MapReduce
Installation de Hadoop
- Télécharger une copie de la distribution sur un des miroirs Apache :
[inovia@ns1 inovia]$ wget http://apache.crihan.fr/dist/hadoop/core/stable/hadoop-0.20.1.tar.gz
- Décompresser l’archive :
[inovia@ns1 inovia]$ tar zxvf hadoop-0.20.1.tar.gz
- Positionner les variables d’environnement suivantes :
export JAVA_HOME=/home/inovia/java/jdk1.6.0_17
export HADOOP_INSTALL=/home/inovia/hadoop-0.20.1
export PATH=$PATH:$JAVA_HOME/bin:$HADOOP_INSTALL/bin
- Vérifier l’installation :
[inovia@ns1 inovia]$ hadoop version
Hadoop 0.20.1
Subversion http://svn.apache.org/repos/asf/hadoop/common/tags/release-0.20.1-rc1 -r 810220
Compiled by oom on Tue Sep 1 20:55:56 UTC 2009
Note : Pour une installation sous Windows, il vous faudra installer les Cygwin tools au préalable.
Implémentation de la fonction Map
Ci-dessous l’implémentation de l’interface Mapper fournie par l’API Hadoop. La fonction map décompose les enregistrements du jeu de données en mots, chacun donnant lieu à une occurrence. Notez qu’il n’y a pas de comptage, celui-ci étant opéré par la fonction reduce.
-
.package fr.inoviaconseil.hadoop;
-
.
-
.import java.io.IOException;
-
.import java.util.StringTokenizer;
-
.import org.apache.hadoop.io.IntWritable;
-
.import org.apache.hadoop.io.LongWritable;
-
.import org.apache.hadoop.io.Text;
-
.import org.apache.hadoop.mapred.MapReduceBase;
-
.import org.apache.hadoop.mapred.Mapper;
-
.import org.apache.hadoop.mapred.OutputCollector;
-
.import org.apache.hadoop.mapred.Reporter;
-
.
-
.class WordCountMapper extends MapReduceBase implements Mapper {
-
. private final static IntWritable mDefOcc = new IntWritable(1);
-
. private Text mWord = new Text();
-
. public void map(LongWritable key, Text value, OutputCollector output, Reporter reporter) throws IOException {
-
. while (lIt.hasMoreTokens()) {
-
. mWord.set(lIt.nextToken());
-
. output.collect(mWord, mDefOcc);
-
. }
-
. }
-
.}
Implémentation de la fonction Reduce
Ci-dessous l’implémentation de l’interface Reducer fournie par l’API Hadoop. La fonction reduce compte le nombre d’occurrences.
-
.package fr.inoviaconseil.hadoop;
-
.
-
.import java.io.IOException;
-
.import java.util.Iterator;
-
.import org.apache.hadoop.io.IntWritable;
-
.import org.apache.hadoop.io.LongWritable;
-
.import org.apache.hadoop.io.Text;
-
.import org.apache.hadoop.mapred.MapReduceBase;
-
.import org.apache.hadoop.mapred.Reducer;
-
.import org.apache.hadoop.mapred.OutputCollector;
-
.import org.apache.hadoop.mapred.Reporter;
-
.
-
.public class WordCountReducer extends MapReduceBase implements Reducer {
-
. public void reduce(Text key, Iterator values, OutputCollector output, Reporter reporter) throws IOException {
-
. int nbOcc = 0;
-
. while (values.hasNext()) {
-
. nbOcc += values.next().get();
-
. }
-
. output.collect(key, new IntWritable(nbOcc));
-
. }
-
.}
Implémentation du client
Le client est une simple classe Java avec une méthode main(), dont le premier paramètre est le fichier de données à traiter, le second paramètre étant le répertoire de stockage des fichiers résultat.
L’assemblage avec les classes Mapper et Reducer est opéré grâce à la classe JavaConf. L’ensemble de ces classes constitue l’application MapReduce.
-
.package fr.inoviaconseil.hadoop;
-
.
-
.import org.apache.hadoop.fs.Path;
-
.import org.apache.hadoop.io.IntWritable;
-
.import org.apache.hadoop.io.Text;
-
.import org.apache.hadoop.mapred.TextInputFormat;
-
.import org.apache.hadoop.mapred.TextOutputFormat;
-
.import org.apache.hadoop.mapred.FileInputFormat;
-
.import org.apache.hadoop.mapred.FileOutputFormat;
-
.import org.apache.hadoop.mapred.JobClient;
-
.import org.apache.hadoop.mapred.JobConf;
-
.
-
.public class WordCount {
-
. JobConf conf = new JobConf(WordCount.class);
-
. conf.setJobName("Compteur de mots");
-
. conf.setOutputKeyClass(Text.class);
-
. conf.setOutputValueClass(IntWritable.class);
-
. conf.setMapperClass(WordCountMapper.class);
-
. conf.setCombinerClass(WordCountReducer.class);
-
. conf.setReducerClass(WordCountReducer.class);
-
. conf.setInputFormat(TextInputFormat.class);
-
. conf.setOutputFormat(TextOutputFormat.class);
-
. FileInputFormat.setInputPaths(conf, new Path(args[0]));
-
. FileOutputFormat.setOutputPath(conf, new Path(args[1]));
-
. JobClient.runJob(conf);
-
. }
-
.}
Enfin, les classes Mapper, Reducer et le client sont packagés dans un jar (wordclient.jar)
Exécution du programme MapReduce
Hadoop supporte 3 modes d’exécution :
- le mode local (ou standalone) où tout s’exécute au sein d’une seule JVM (pas de démons Hadoop). Ce mode est préconisé en phase de développement.
- le mode pseudo-distribué simulant le fonctionnement en mode cluster où les démons Hadoop sont exécutés localement.
- le mode distribué correspondant au cas où les démons Hadoop sont exécutés sur tous les nœuds du cluster.
Évidemment, notre cas d’utilisation est trivial, c’est tout naturellement qu’on exécutera l’application MapReduce en mode local :
hadoop jar wordcount.jar fr.inoviaconseil.hadoop.WordCount ./input/data.txt ./output/
Le jeu de données en entrée contient le texte suivant :
[inovia@ns1 wordcount]$ cat input/data.txt
savoir faire
et savoir etre
sans faire savoir
Le résultat de l’exécution est généré dans le répertoire output :
[inovia@ns1 wordcount]$ cat output/part-00000
et 1
etre 1
faire 2
sans 1
savoir 3
Le troisième et dernier article de la série sera l’occasion de dresser un panorama de l’écosystème Hadoop.
Articles en relation :
Comments
3 Responses to “Hadoop, une introduction - 2/3”
Laisser un commentaire
novembre 13th, 2009 @ 9:01
[…] Hadoop, une introduction 2/3 Catégorie: cloud computing, hadoop […]
janvier 6th, 2010 @ 13:29
[…] Dans ce troisième et dernier volet de la série, nous dresserons un panorama de l’écosystème Hadoop. La lecture des deux premiers billets est un préalable utile. […]
juin 19th, 2010 @ 16:45
Добрый день! < a href=”http://tehnon.ru/mail/ jose@tehnon.ru” >…< /a >…
с ув….