WordCount on Hadoop with Scala
Hadoop is a great technology built with java.
Today we will use Scala to implement a simple map reduce job and then run it using HDInsight. We shall add the assembly plugin on our assembly.sbt
addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "0.14.3") Then we will add the Hadoop core dependency on our build.sbt file. Also will we apply some configuration in the merge strategy to avoid deduplicate errors. assemblyMergeStrategy in assembly := { case PathList("META-INF", xs @ _*) => MergeStrategy.discard case x => MergeStrategy.first } libraryDependencies += "org.apache.hadoop" % "hadoop-core" % "1.2.1"
We will use WordCount as an example. The original Java class shall be transformed to a Scala class.
package com.gkatzioura.scala import java.lang.Iterable import java.util.StringTokenizer import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.Path import{IntWritable, Text} import org.apache.hadoop.mapreduce.lib.input.FileInputFormat import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat import org.apache.hadoop.mapreduce.{Job, Mapper, Reducer} import scala.collection.JavaConverters._ /** * Created by gkatzioura on 2/14/17. */ package object WordCount { class TokenizerMapper extends Mapper[Object, Text, Text, IntWritable] { val one = new IntWritable(1) val word = new Text() override def map(key: Object, value: Text, context: Mapper[Object, Text, Text, IntWritable]#Context): Unit = { val itr = new StringTokenizer(value.toString) while (itr.hasMoreTokens()) { word.set(itr.nextToken()) context.write(word, one) } } } class IntSumReader extends Reducer[Text,IntWritable,Text,IntWritable] { override def reduce(key: Text, values: Iterable[IntWritable], context: Reducer[Text, IntWritable, Text, IntWritable]#Context): Unit = { var sum = values.asScala.foldLeft(0)(_ + _.get) context.write(key, new IntWritable(sum)) } } def main(args: Array[String]): Unit = { val configuration = new Configuration val job = Job.getInstance(configuration,"word count") job.setJarByClass(this.getClass) job.setMapperClass(classOf[TokenizerMapper]) job.setCombinerClass(classOf[IntSumReader]) job.setReducerClass(classOf[IntSumReader]) job.setOutputKeyClass(classOf[Text]) job.setOutputKeyClass(classOf[Text]); job.setOutputValueClass(classOf[IntWritable]); FileInputFormat.addInputPath(job, new Path(args(0))) FileOutputFormat.setOutputPath(job, new Path(args(1))) System.exit(if(job.waitForCompletion(true)) 0 else 1) } }
Then we will build our example
sbt clean compile assembly
Our new jar will reside on target/scala-2.12/ScalaHadoop-assembly-1.0.jar On the next post we shall run our code using Azure’s HDInsight.
You can find the code on github.
Reference: | WordCount on Hadoop with Scala from our JCG partner Emmanouil Gkatziouras at the gkatzioura blog. |