Enterprise Java

Writing a Hadoop MapReduce task in Java

Although Hadoop Framework itself is created with Java the MapReduce jobs can be written in many different languages. In this post I show how to create a MapReduce job in Java based on a Maven project like any other Java project.
 
 
 
 
 
 
 
 

    • Prepare the example input

Lets start with a fictional business case. In this case we need a CSV file with English words from a dictionary and all translations in other languages added to it, separated by a ‘|’ symbol. I have based this example on this post. So the job will read dictionaries of different languages and match each English word with a translation in another language. The input dictionaries for the job is taken from here. I downloaded a few files in different languages and put them together in one file (Hadoop is better to process one large file than multiple small ones). My example file can be found here.

    • Create the Java MapReduce project

Next step is creating the Java code for the MapReduce job. Like I said before I use a Maven project for this so I created a new empty Maven project in my IDE, IntelliJ. I modified the default pom to add the necessary plugins and dependencies:
The dependency I added:

<dependency>
   <groupId>org.apache.hadoop</groupId>
   <artifactId>hadoop-core</artifactId>
   <version>1.2.0</version>
   <scope>provided</scope>
</dependency>

The Hadoop dependency is necessary to make use of the Hadoop classes in my MapReduce job. Since I want to run the job on AWS EMR I make sure I have a matching Hadoop version. Furthermore the scope can be set to ‘provided’ since the Hadoop framework will be available on the Hadoop cluster.

Beside the dependency I added the following two plugins to the pom.xml:

<plugins>
  <plugin>
    <groupId>org.apache.maven.plugins</groupId>
    <artifactId>maven-jar-plugin</artifactId>
    <configuration>
      <archive>
        <manifest>
          <addClasspath>true</addClasspath>
          <mainClass>net.pascalalma.hadoop.Dictionary</mainClass>
        </manifest>
      </archive>
    </configuration>
  </plugin>
  <plugin>
    <groupId>org.apache.maven.plugins</groupId>
    <artifactId>maven-compiler-plugin</artifactId>
    <configuration>
      <source>1.6</source>
      <target>1.6</target>
    </configuration>
  </plugin>
</plugins>

The first plugin is used to create an executable jar of our project. This makes the running of the JAR on the Hadoop cluster easier since we don’t have to state the main class.

The second plugin is necessary to make the created JAR compatible with the instances of the AWS EMR cluster. This AWS cluster comes with a JDK 1.6. If you omit this one the cluster will fail (I got a message like ‘Unsupported major.minor version 51.0′). I will show later in another post how to setup this AWS EMR cluster.

That is the basic project, just like a regular Java project. Lets implement the MapReduce jobs next.

    • Implement the MapReduce classes

I have described the functionality that we want to perform in the first step. To achieve this I created three Java classes in my Hadoop project. The first class is the ‘Mapper‘:

package net.pascalalma.hadoop;

import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import java.io.IOException;
import java.util.StringTokenizer;

/**
 * Created with IntelliJ IDEA.
 * User: pascal
 * Date: 16-07-13
 * Time: 12:07
 */
public class WordMapper extends Mapper<Text,Text,Text,Text> {

    private Text word = new Text();

    public void map(Text key, Text value, Context context) throws IOException, InterruptedException
    {
        StringTokenizer itr = new StringTokenizer(value.toString(),",");
        while (itr.hasMoreTokens())
        {
            word.set(itr.nextToken());
            context.write(key, word);
        }
    }
}

This class isn’t very complicated. It just receives a row from the input file and creates a Map of it in which each key will have one value (and multiple keys are allowed at this stage).

The next class is the ‘Reducer‘ which reduces the map to the wanted output:

package net.pascalalma.hadoop;

import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;

/**
 * Created with IntelliJ IDEA.
 * User: pascal
 * Date: 17-07-13
 * Time: 19:50
 */
public class AllTranslationsReducer extends Reducer<Text, Text, Text, Text> {

    private Text result = new Text();

    @Override
    protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
        String translations = "";

        for (Text val : values) {
            translations += "|" + val.toString();
        }

        result.set(translations);
        context.write(key, result);
    }
}

This Reduce steps collects all values for a given key and put them after each other separated with a ‘|’ symbol.

The final class left is the one that is putting it all together to make it a runnable job:

package net.pascalalma.hadoop;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapreduce.lib.input.KeyValueTextInputFormat;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;

/**
 * Created with IntelliJ IDEA.
 * User: pascal
 * Date: 16-07-13
 * Time: 12:07
 */
public class Dictionary {

    public static void main(String[] args) throws Exception
    {
        Configuration conf = new Configuration();
        Job job = new Job(conf, "dictionary");
        job.setJarByClass(Dictionary.class);
        job.setMapperClass(WordMapper.class);
        job.setReducerClass(AllTranslationsReducer.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(Text.class);

        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(Text.class);

        job.setInputFormatClass(KeyValueTextInputFormat.class);
        job.setOutputFormatClass(TextOutputFormat.class);
        FileInputFormat.addInputPath(job, new Path(args[0])); 
        FileOutputFormat.setOutputPath(job, new Path(args[1]));
        boolean result = job.waitForCompletion(true);
        System.exit(result ? 0 : 1);
    }
}

In this main method we put together a Job and run it. Please note that I simply expect the args[0] and args[1] to be the name of the input file and output directory (non existing). I didn’t add any check for this. Here is my ‘Run Configuration’ in IntelliJ:

screen-shot-2013-08-15-at-21-36-35

Just make sure the output directory is not existing at the time you run the class. The logging output created by the job looks like this:

2013-08-15 21:37:00.595 java[73982:1c03] Unable to load realm info from SCDynamicStore
aug 15, 2013 9:37:01 PM org.apache.hadoop.util.NativeCodeLoader <clinit>
WARNING: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
aug 15, 2013 9:37:01 PM org.apache.hadoop.mapred.JobClient copyAndConfigureFiles
WARNING: Use GenericOptionsParser for parsing the arguments. Applications should implement Tool for the same.
aug 15, 2013 9:37:01 PM org.apache.hadoop.mapred.JobClient copyAndConfigureFiles
WARNING: No job jar file set.  User classes may not be found. See JobConf(Class) or JobConf#setJar(String).
aug 15, 2013 9:37:01 PM org.apache.hadoop.mapreduce.lib.input.FileInputFormat listStatus
INFO: Total input paths to process : 1
aug 15, 2013 9:37:01 PM org.apache.hadoop.io.compress.snappy.LoadSnappy <clinit>
WARNING: Snappy native library not loaded
aug 15, 2013 9:37:01 PM org.apache.hadoop.mapred.JobClient monitorAndPrintJob
INFO: Running job: job_local_0001
aug 15, 2013 9:37:01 PM org.apache.hadoop.mapred.Task initialize
INFO:  Using ResourceCalculatorPlugin : null
aug 15, 2013 9:37:01 PM org.apache.hadoop.mapred.MapTask$MapOutputBuffer <init>
INFO: io.sort.mb = 100
aug 15, 2013 9:37:01 PM org.apache.hadoop.mapred.MapTask$MapOutputBuffer <init>
INFO: data buffer = 79691776/99614720
aug 15, 2013 9:37:01 PM org.apache.hadoop.mapred.MapTask$MapOutputBuffer <init>
INFO: record buffer = 262144/327680
aug 15, 2013 9:37:02 PM org.apache.hadoop.mapred.MapTask$MapOutputBuffer flush
INFO: Starting flush of map output
aug 15, 2013 9:37:02 PM org.apache.hadoop.mapred.MapTask$MapOutputBuffer sortAndSpill
INFO: Finished spill 0
aug 15, 2013 9:37:02 PM org.apache.hadoop.mapred.Task done
INFO: Task:attempt_local_0001_m_000000_0 is done. And is in the process of commiting
aug 15, 2013 9:37:02 PM org.apache.hadoop.mapred.JobClient monitorAndPrintJob
INFO:  map 0% reduce 0%
aug 15, 2013 9:37:04 PM org.apache.hadoop.mapred.LocalJobRunner$Job statusUpdate
INFO: 
aug 15, 2013 9:37:04 PM org.apache.hadoop.mapred.Task sendDone
INFO: Task 'attempt_local_0001_m_000000_0' done.
aug 15, 2013 9:37:04 PM org.apache.hadoop.mapred.Task initialize
INFO:  Using ResourceCalculatorPlugin : null
aug 15, 2013 9:37:04 PM org.apache.hadoop.mapred.LocalJobRunner$Job statusUpdate
INFO: 
aug 15, 2013 9:37:04 PM org.apache.hadoop.mapred.Merger$MergeQueue merge
INFO: Merging 1 sorted segments
aug 15, 2013 9:37:04 PM org.apache.hadoop.mapred.Merger$MergeQueue merge
INFO: Down to the last merge-pass, with 1 segments left of total size: 524410 bytes
aug 15, 2013 9:37:04 PM org.apache.hadoop.mapred.LocalJobRunner$Job statusUpdate
INFO: 
aug 15, 2013 9:37:05 PM org.apache.hadoop.mapred.Task done
INFO: Task:attempt_local_0001_r_000000_0 is done. And is in the process of commiting
aug 15, 2013 9:37:05 PM org.apache.hadoop.mapred.LocalJobRunner$Job statusUpdate
INFO: 
aug 15, 2013 9:37:05 PM org.apache.hadoop.mapred.Task commit
INFO: Task attempt_local_0001_r_000000_0 is allowed to commit now
aug 15, 2013 9:37:05 PM org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter commitTask
INFO: Saved output of task 'attempt_local_0001_r_000000_0' to /Users/pascal/output
aug 15, 2013 9:37:05 PM org.apache.hadoop.mapred.JobClient monitorAndPrintJob
INFO:  map 100% reduce 0%
aug 15, 2013 9:37:07 PM org.apache.hadoop.mapred.LocalJobRunner$Job statusUpdate
INFO: reduce > reduce
aug 15, 2013 9:37:07 PM org.apache.hadoop.mapred.Task sendDone
INFO: Task 'attempt_local_0001_r_000000_0' done.
aug 15, 2013 9:37:08 PM org.apache.hadoop.mapred.JobClient monitorAndPrintJob
INFO:  map 100% reduce 100%
aug 15, 2013 9:37:08 PM org.apache.hadoop.mapred.JobClient monitorAndPrintJob
INFO: Job complete: job_local_0001
aug 15, 2013 9:37:08 PM org.apache.hadoop.mapred.Counters log
INFO: Counters: 17
aug 15, 2013 9:37:08 PM org.apache.hadoop.mapred.Counters log
INFO:   File Output Format Counters 
aug 15, 2013 9:37:08 PM org.apache.hadoop.mapred.Counters log
INFO:     Bytes Written=423039
aug 15, 2013 9:37:08 PM org.apache.hadoop.mapred.Counters log
INFO:   FileSystemCounters
aug 15, 2013 9:37:08 PM org.apache.hadoop.mapred.Counters log
INFO:     FILE_BYTES_READ=1464626
aug 15, 2013 9:37:08 PM org.apache.hadoop.mapred.Counters log
INFO:     FILE_BYTES_WRITTEN=1537251
aug 15, 2013 9:37:08 PM org.apache.hadoop.mapred.Counters log
INFO:   File Input Format Counters 
aug 15, 2013 9:37:08 PM org.apache.hadoop.mapred.Counters log
INFO:     Bytes Read=469941
aug 15, 2013 9:37:08 PM org.apache.hadoop.mapred.Counters log
INFO:   Map-Reduce Framework
aug 15, 2013 9:37:08 PM org.apache.hadoop.mapred.Counters log
INFO:     Reduce input groups=11820
aug 15, 2013 9:37:08 PM org.apache.hadoop.mapred.Counters log
INFO:     Map output materialized bytes=524414
aug 15, 2013 9:37:08 PM org.apache.hadoop.mapred.Counters log
INFO:     Combine output records=0
aug 15, 2013 9:37:08 PM org.apache.hadoop.mapred.Counters log
INFO:     Map input records=20487
aug 15, 2013 9:37:08 PM org.apache.hadoop.mapred.Counters log
INFO:     Reduce shuffle bytes=0
aug 15, 2013 9:37:08 PM org.apache.hadoop.mapred.Counters log
INFO:     Reduce output records=11820
aug 15, 2013 9:37:08 PM org.apache.hadoop.mapred.Counters log
INFO:     Spilled Records=43234
aug 15, 2013 9:37:08 PM org.apache.hadoop.mapred.Counters log
INFO:     Map output bytes=481174
aug 15, 2013 9:37:08 PM org.apache.hadoop.mapred.Counters log
INFO:     Total committed heap usage (bytes)=362676224
aug 15, 2013 9:37:08 PM org.apache.hadoop.mapred.Counters log
INFO:     Combine input records=0
aug 15, 2013 9:37:08 PM org.apache.hadoop.mapred.Counters log
INFO:     Map output records=21617
aug 15, 2013 9:37:08 PM org.apache.hadoop.mapred.Counters log
INFO:     SPLIT_RAW_BYTES=108
aug 15, 2013 9:37:08 PM org.apache.hadoop.mapred.Counters log
INFO:     Reduce input records=21617

Process finished with exit code 0

The output file created by this job can be found in the supplied output directory as can be seen in the next screenshot:

screen-shot-2013-08-15-at-21-42-49

As you have seen we can run this main method in an IDE (or from the command line) but I would like to see some unit tests performed on the Mapper and Reducer before we go there. I will show this in another post how to do that.
 

Pascal Alma

Pascal is a senior JEE Developer and Architect at 4Synergy in The Netherlands. Pascal has been designing and building J2EE applications since 2001. He is particularly interested in Open Source toolstack (Mule, Spring Framework, JBoss) and technologies like Web Services, SOA and Cloud technologies. Specialties: JEE, SOA, Mule ESB, Maven, Cloud Technology, Amazon AWS.
Subscribe
Notify of
guest

This site uses Akismet to reduce spam. Learn how your comment data is processed.

17 Comments
Oldest
Newest Most Voted
Inline Feedbacks
View all comments
Maffy Davison
Maffy Davison
11 years ago

Tried to run the job as described – in intellij with the config (paths changed, obviously, but otherwise the same).

The run doesn’t seem to be able to find the configuration class…

Is there a dependency I’m missing in the pom? I just added the one hadoop dependency per the instructions.

Here’s what I get:

Exception in thread “main” java.lang.NoClassDefFoundError: org/apache/hadoop/conf/Configuration
at com.maffy.example.mapreduce.Dictionary.main(Dictionary.java:23)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at com.intellij.rt.execution.application.AppMain.main(AppMain.java:120)
Caused by: java.lang.ClassNotFoundException: org.apache.hadoop.conf.Configuration
at java.net.URLClassLoader$1.run(URLClassLoader.java:366)
at java.net.URLClassLoader$1.run(URLClassLoader.java:355)
at java.security.AccessController.doPrivileged(Native Method)
at java.net.URLClassLoader.findClass(URLClassLoader.java:354)
at java.lang.ClassLoader.loadClass(ClassLoader.java:425)
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308)
at java.lang.ClassLoader.loadClass(ClassLoader.java:358)
… 6 more

Silviu BURCEA
Silviu BURCEA
10 years ago
Reply to  Maffy Davison

I think you are using a different version of Hadoop or you don’t have the hadoop-common in your POM/classpath.

Rehan
Rehan
9 years ago

Thanks for your post. To perform a MapReduce job, we need a mapper implementation and a reducer implementation. As you can see in the above code, mapper and reducer classes are defined as inner classes. Also MapReduce job configuration happens within its main method, mainly with the use of an instance of Job class. You can go through the WordCount.java code and refer the comments I’ve added in there, to understand these configurations. https://intellipaat.com/

tejaswi
9 years ago

i want a sample program to give a ppt

dev
dev
6 years ago
Reply to  tejaswi
Elena Kretova
6 years ago

Thanks for sharing info, to perform a MapReduce task, we want a mapper implementation and a reducer implementation. As you could see within the above code, mapper and reducer training are defined as inner training. additionally MapReduce job configuration happens within its major technique, in particular with using an instance of activity elegance. you may go through the WordCount.java code and refer the comments I’ve delivered in there, to apprehend these configurations.

Honey
6 years ago

I really enjoy to read your information about Hadoop MapReduce task. Thanks for sharing Appreciate your work. I would like to share about online training. it will be helpful

jonostocker
jonostocker
5 years ago

Thanks for sharing Appreciate your work. the blog above explains the code which will help you use java programming.

sparjan
4 years ago
Reply to  jonostocker
mark
2 years ago
Reply to  jonostocker

I really enjoy to read your information about Hadoop MapReduce task. Thanks for sharing Appreciate your work. I would like to share about online training. it will be helpful

chandu
4 years ago

Thank you for sharing

jhon richards
3 years ago

Hi, Thanks for sharing

sravs sravani
3 years ago

Thanks for sharing

sravani
3 years ago

Thanks for sharing article

mark
2 years ago

Thanks, It helped a lot. I was looking for below
Here’s what I get:
Exception in thread “main” java.lang.NoClassDefFoundError: org/apache/hadoop/conf/Configuration
at com.maffy.example.mapreduce.Dictionary.main(Dictionary.java:23)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at com.intellij.rt.execution.application.AppMain.main(AppMain.java:120)
Caused by: java.lang.ClassNotFoundException: org.apache.hadoop.conf.Configuration
at java.net.URLClassLoader$1.run(URLClassLoader.java:366)
at java.net.URLClassLoader$1.run(URLClassLoader.java:355)
at java.security.AccessController.doPrivileged(Native Method)
at java.net.URLClassLoader.findClass(URLClassLoader.java:354)
at java.lang.ClassLoader.loadClass(ClassLoader.java:425)
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308)
at java.lang.ClassLoader.loadClass(ClassLoader.java:358)

mark
2 years ago
Reply to  mark

Can we do this way Replace the build.gradle in the project with the following, apply plugin: 'java-library' apply plugin: 'application'   mainClassName = "AlphaCounter"   jar {     manifest { attributes 'Main-Class': "$mainClassName"} } repositories { jcenter() }   dependencies { compile 'org.apache.hadoop:hadoop-client:2.7.3'} <a href=”https://madanswer.com/agile”> agile Interview questions Answers </a> <a href=”https://madanswer.com/javascript”> Java script Interview questions Answers </a> <a href=”https://madanswer.com/ansible”> ansible Interview questions Answers </a> <a href=”https://madanswer.com/bootstrap”> bootstrap Interview questions Answers </a> <a href=”https://madanswer.com/angular”> Angular Interview questions Answers </a> <a href=”https://madanswer.com/android”> Android Interview questions Answers </a> <a href=”https://madanswer.com/Linux”> Linux Interview questions Answers </a> <a href=”https://madanswer.com/Kibana”> Kibana Interview questions Answers </a> <a… Read more »

Pavithra
1 year ago

We appreciate you sharing this information. In order to complete a MapReduce task, we need both a Mapper and a Reducer implementation. Mapper and reducer training are referred to as inner training in the code above, as you can see. Furthermore, MapReduce job configuration takes place within its primary method, particularly when employing an example of activity elegance. To understand these configurations, you can look through the WordCount.java code and refer to the comments I’ve left there. https://infycletechnologies.com/

Back to top button