Scala
WordCount on Hadoop with Scala
Hadoop is a great technology built with java.
Today we will use Scala to implement a simple map reduce job and then run it using HDInsight. We shall add the assembly plugin on our assembly.sbt
01 02 03 04 05 06 07 08 09 10 11 12 13 | addSbtPlugin( "com.eed3si9n" % "sbt-assembly" % "0.14.3" ) Then we will add the Hadoop core dependency on our build.sbt file. Also will we apply some configuration in the merge strategy to avoid deduplicate errors. assemblyMergeStrategy in assembly := { case PathList( "META-INF" , xs @ _ *) = > MergeStrategy.discard case x = > MergeStrategy.first } libraryDependencies + = "org.apache.hadoop" % "hadoop-core" % "1.2.1" |
We will use WordCount as an example. The original Java class shall be transformed to a Scala class.
01 02 03 04 05 06 07 08 09 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 | package com.gkatzioura.scala import java.lang.Iterable import java.util.StringTokenizer import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.Path import org.apache.hadoop.io.{IntWritable, Text} import org.apache.hadoop.mapreduce.lib.input.FileInputFormat import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat import org.apache.hadoop.mapreduce.{Job, Mapper, Reducer} import scala.collection.JavaConverters. _ /** * Created by gkatzioura on 2/14/17. */ package object WordCount { class TokenizerMapper extends Mapper[Object, Text, Text, IntWritable] { val one = new IntWritable( 1 ) val word = new Text() override def map(key : Object, value : Text, context : Mapper[Object, Text, Text, IntWritable] # Context) : Unit = { val itr = new StringTokenizer(value.toString) while (itr.hasMoreTokens()) { word.set(itr.nextToken()) context.write(word, one) } } } class IntSumReader extends Reducer[Text,IntWritable,Text,IntWritable] { override def reduce(key : Text, values : Iterable[IntWritable], context : Reducer[Text, IntWritable, Text, IntWritable] # Context) : Unit = { var sum = values.asScala.foldLeft( 0 )( _ + _ .get) context.write(key, new IntWritable(sum)) } } def main(args : Array[String]) : Unit = { val configuration = new Configuration val job = Job.getInstance(configuration, "word count" ) job.setJarByClass( this .getClass) job.setMapperClass(classOf[TokenizerMapper]) job.setCombinerClass(classOf[IntSumReader]) job.setReducerClass(classOf[IntSumReader]) job.setOutputKeyClass(classOf[Text]) job.setOutputKeyClass(classOf[Text]); job.setOutputValueClass(classOf[IntWritable]); FileInputFormat.addInputPath(job, new Path(args( 0 ))) FileOutputFormat.setOutputPath(job, new Path(args( 1 ))) System.exit( if (job.waitForCompletion( true )) 0 else 1 ) } } |
Then we will build our example
1 | sbt clean compile assembly |
Our new jar will reside on target/scala-2.12/ScalaHadoop-assembly-1.0.jar On the next post we shall run our code using Azure’s HDInsight.
You can find the code on github.
Reference: | WordCount on Hadoop with Scala from our JCG partner Emmanouil Gkatziouras at the gkatzioura blog. |