MapReduce Tutorial

This document comprehensively describes all user-facing facets of theHadoop MapReduce framework and serves as a tutorial.

Ensure that Hadoop is installed, configured and is running. Moredetails:Single Node Setup for first-time users.Cluster Setup for large,distributed clusters.

Hadoop MapReduce is a software framework for easily writingapplications which process vast amounts of data (multi-terabyte data-sets)in-parallel on large clusters (thousands of nodes) of commodityhardware in a reliable, fault-tolerant manner.

A MapReduce job usually splits the input data-set intoindependent chunks which are processed by the map tasks in acompletely parallel manner. The framework sorts the outputs of the maps,which are then input to the reduce tasks. Typically both theinput and the output of the job are stored in a file-system. The frameworktakes care of scheduling tasks, monitoring them and re-executes the failedtasks.

Typically the compute nodes and the storage nodes are the same, that is,the MapReduce framework and the Hadoop Distributed File System (see HDFS Architecture Guide)are running on the same set of nodes. This configurationallows the framework to effectively schedule tasks on the nodes where datais already present, resulting in very high aggregate bandwidth across thecluster.

The MapReduce framework consists of a single masterJobTracker and one slave TaskTracker percluster-node. The master is responsible for scheduling the jobs' componenttasks on the slaves, monitoring them and re-executing the failed tasks. Theslaves execute the tasks as directed by the master.

Minimally, applications specify the input/output locations and supplymap and reduce functions via implementations ofappropriate interfaces and/or abstract-classes. These, and other jobparameters, comprise the job configuration. The Hadoopjob client then submits the job (jar/executable etc.) andconfiguration to the JobTracker which then assumes theresponsibility of distributing the software/configuration to the slaves,scheduling tasks and monitoring them, providing status and diagnosticinformation to the job-client.

Although the Hadoop framework is implemented in JavaTM,MapReduce applications need not be written in Java.Hadoop Streaming is a utility which allows users to create and runjobs with any executables (e.g. shell utilities) as the mapper and/orthe reducer.Hadoop Pipes is a SWIG-compatible C++ API to implement MapReduce applications (nonJNITM based).

The MapReduce framework operates exclusively on pairs, that is, the framework views theinput to the job as a set of pairs andproduces a set of pairs as the output ofthe job, conceivably of different types.

The key and value classes have to beserializable by the framework and hence need to implement theWritableinterface. Additionally, the key classes have to implement theWritableComparable interface to facilitate sorting by the framework.

Input and Output types of a MapReduce job:

(input) ->map->->combine->->reduce-> (output)

Before we jump into the details, lets walk through an example MapReduceapplication to get a flavour for how they work.

WordCount is a simple application that counts the number ofoccurences of each word in a given input set.

This works with a local-standalone, pseudo-distributed or fully-distributedHadoop installation (Single Node Setup).Source CodeWordCount.java1.package org.myorg;2.3.import java.io.IOException;4.import java.util.*;5.6.import org.apache.hadoop.fs.Path;7.import org.apache.hadoop.conf.*;8.import org.apache.hadoop.io.*;9.import org.apache.hadoop.mapred.*;10.import org.apache.hadoop.util.*;11.12.public class WordCount {13.14.  public static class Map extends MapReduceBaseimplements Mapper {15.    private final static IntWritable one = new IntWritable(1);16.    private Text word = new Text();17.18.    public void map(LongWritable key, Text value,OutputCollector output,Reporter reporter) throws IOException {19.      String line = value.toString();20.      StringTokenizer tokenizer = new StringTokenizer(line);21.      while (tokenizer.hasMoreTokens()) {22.        word.set(tokenizer.nextToken());23.        output.collect(word, one);24.      }25.    }26.  }27.28.  public static class Reduce extends MapReduceBase implementsReducer {29.    public void reduce(Text key, Iterator values,OutputCollector output,Reporter reporter) throws IOException {30.      int sum = 0;31.      while (values.hasNext()) {32.        sum += values.next().get();33.      }34.      output.collect(key, new IntWritable(sum));35.    }36.  }37.38.  public static void main(String[] args) throws Exception {39.    JobConf conf = new JobConf(WordCount.class);40.    conf.setJobName("wordcount");41.42.    conf.setOutputKeyClass(Text.class);43.    conf.setOutputValueClass(IntWritable.class);44.45.    conf.setMapperClass(Map.class);46.    conf.setCombinerClass(Reduce.class);47.    conf.setReducerClass(Reduce.class);48.49.    conf.setInputFormat(TextInputFormat.class);50.    conf.setOutputFormat(TextOutputFormat.class);51.52.    FileInputFormat.setInputPaths(conf, new Path(args[0]));53.    FileOutputFormat.setOutputPath(conf, new Path(args[1]));54.55.    JobClient.runJob(conf);57.  }58.}59.Usage

Assuming HADOOP_HOME is the root of the installation andHADOOP_VERSION is the Hadoop version installed, compileWordCount.java and create a jar:

$ mkdir wordcount_classes$ javac -classpath ${HADOOP_HOME}/hadoop-${HADOOP_VERSION}-core.jar-d wordcount_classes WordCount.java$ jar -cvf /usr/joe/wordcount.jar -C wordcount_classes/ .

Assuming that:/usr/joe/wordcount/input- input directory in HDFS/usr/joe/wordcount/output - output directory in HDFS

$ bin/hadoop dfs -ls /usr/joe/wordcount/input//usr/joe/wordcount/input/file01/usr/joe/wordcount/input/file02

$ bin/hadoop dfs -cat /usr/joe/wordcount/input/file01Hello World Bye World

$ bin/hadoop dfs -cat /usr/joe/wordcount/input/file02Hello Hadoop Goodbye Hadoop

$ bin/hadoop jar /usr/joe/wordcount.jar org.myorg.WordCount/usr/joe/wordcount/input /usr/joe/wordcount/output

$ bin/hadoop dfs -cat /usr/joe/wordcount/output/part-00000Bye1Goodbye1Hadoop2Hello2World2

Applications can specify a comma separated list of paths whichwould be present in the current working directory of the taskusing the option -files. The -libjarsoption allows applications to add jars to the classpaths of the mapsand reduces. The option -archives allows them to passcomma separated list of archives as arguments. These archives areunarchived and a link with name of the archive is created inthe current working directory of tasks. Moredetails about the command line options are available atCommands Guide.

Running wordcount example with-libjars, -files and -archives:hadoop jar hadoop-examples.jar wordcount -files cachefile.txt-libjars mylib.jar -archives myarchive.zip input outputHere, myarchive.zip will be placed and unzipped into a directoryby the name "myarchive.zip".

Users can specify a different symbolic name forfiles and archives passed through -files and -archives option, using #.

For example,hadoop jar hadoop-examples.jar wordcount-files dir1/dict.txt#dict1,dir2/dict.txt#dict2-archives mytar.tgz#tgzdir input outputHere, the files dir1/dict.txt and dir2/dict.txt can be accessed bytasks using the symbolic names dict1 and dict2 respectively.The archive mytar.tgz will be placed and unarchived into adirectory by the name "tgzdir".Walk-through

The WordCount application is quite straight-forward.

The Mapper implementation (lines 14-26), via themap method (lines 18-25), processes one line at a time,as provided by the specified TextInputFormat (line 49).It then splits the line into tokens separated by whitespaces, via theStringTokenizer, and emits a key-value pair of< , 1>.

For the given sample input the first map emits:< Hello, 1>< World, 1>< Bye, 1>< World, 1>

The second map emits:< Hello, 1>< Hadoop, 1>< Goodbye, 1>< Hadoop, 1>

We'll learn more about the number of maps spawned for a given job, andhow to control them in a fine-grained manner, a bit later in thetutorial.

Post a Comment (0)
Previous Post Next Post