WordCount with Storm and Scala
Apache Storm is a free and open source distributed realtime computation system running on the JVM. To get started we will implement a very simple example. Previously we implemented a word count hadoop job using scala and we uploaded it to hdinsight.
We will focus on the same word count concept but for real time cases and implement a word count topology utilizing apache storm. Our source code will be based on the official storm examples.
Storm works with spouts and bolts.
First We shall implement a spout which will emit fake data events. In our case sentences.
01 02 03 04 05 06 07 08 09 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 | package com.gkatzioura.scala.storm import org.apache.storm.spout.SpoutOutputCollector import org.apache.storm.task.TopologyContext import org.apache.storm.topology.OutputFieldsDeclarer import org.apache.storm.topology.base.BaseRichSpout import org.apache.storm.tuple.{Fields, Values} import org.apache.storm.utils.Utils import scala.util.Random /** * Created by gkatzioura on 2/17/17. */ class RandomSentenceSpout extends BaseRichSpout { var _ collector : SpoutOutputCollector = _ var _ rand : Random = _ override def nextTuple() : Unit = { Utils.sleep( 100 ) val sentences = Array( "the cow jumped over the moon" , "an apple a day keeps the doctor away" , "four score and seven years ago" , "snow white and the seven dwarfs" , "i am at two with nature" ) val sentence = sentences( _ rand.nextInt(sentences.length)) _ collector.emit( new Values(sentence)) } override def open(conf : java.util.Map[ _ , _ ], context : TopologyContext, collector : SpoutOutputCollector) : Unit = { _ collector = collector _ rand = Random } override def declareOutputFields(declarer : OutputFieldsDeclarer) : Unit = { declarer.declare( new Fields( "word" )) } } |
Next step is to implement a bolt which splits the sentences and emits them.
01 02 03 04 05 06 07 08 09 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 | package com.gkatzioura.scala.storm import java.text.BreakIterator import org.apache.storm.topology.{BasicOutputCollector, OutputFieldsDeclarer} import org.apache.storm.topology.base.BaseBasicBolt import org.apache.storm.tuple.{Fields, Tuple, Values} /** * Created by gkatzioura on 2/18/17. */ class SplitSentenceBolt extends BaseBasicBolt { override def execute(input : Tuple, collector : BasicOutputCollector) : Unit = { val sentence = input.getString( 0 ) val boundary = BreakIterator.getWordInstance boundary.setText(sentence) var start = boundary.first var end : Int = start while (end! = BreakIterator.DONE) { end = boundary.next val word = sentence.substring(start,end).replaceAll( "\\s+" , "" ) start = end if (!word.equals( "" )) { collector.emit( new Values(word)) } } } override def declareOutputFields(declarer : OutputFieldsDeclarer) : Unit = { declarer.declare( new Fields( "word" )) } } |
And the last step is the word count bolt.
01 02 03 04 05 06 07 08 09 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 | package com.gkatzioura.scala.storm import org.apache.storm.topology.{BasicOutputCollector, OutputFieldsDeclarer} import org.apache.storm.topology.base.BaseBasicBolt import org.apache.storm.tuple.{Fields, Tuple, Values} /** * Created by gkatzioura on 2/18/17. */ class WordCountBolt extends BaseBasicBolt{ val counts = scala.collection.mutable.Map[String,Int]() override def execute(input : Tuple, collector : BasicOutputCollector) : Unit = { val word = input.getString( 0 ) val optCount = counts.get(word) if (optCount.isEmpty) { counts.put(word, 1 ) } else { counts.put(word,optCount.get+ 1 ) } collector.emit( new Values(word,counts)) } override def declareOutputFields(declarer : OutputFieldsDeclarer) : Unit = { declarer.declare( new Fields( "word" , "count" )); } } |
The final step is to create our topology which takes care whether we run locally or in a cluster environment.
01 02 03 04 05 06 07 08 09 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 | package com.gkatzioura.scala.storm import org.apache.storm.{Config, LocalCluster, StormSubmitter} import org.apache.storm.topology.TopologyBuilder import org.apache.storm.tuple.Fields /** * Created by gkatzioura on 2/18/17. */ object WordCountTopology { def main(args : Array[String]) : Unit = { println( "Hello, world!" ) val builder = new TopologyBuilder builder.setSpout( "spout" , new RandomSentenceSpout, 5 ) builder.setBolt( "split" , new SplitSentenceBolt, 8 ).shuffleGrouping( "spout" ) builder.setBolt( "count" , new WordCountBolt, 12 ).fieldsGrouping( "split" , new Fields( "word" )) val conf = new Config() conf.setDebug( true ) if (args ! = null && args.length > 0 ) { conf.setNumWorkers( 3 ) StormSubmitter.submitTopology(args( 0 ), conf, builder.createTopology()) } else { conf.setMaxTaskParallelism( 3 ) val cluster = new LocalCluster cluster.submitTopology( "word-count" , conf, builder.createTopology()) Thread.sleep( 10000 ) cluster.shutdown() } } } |
Now we shall build our app. To do so we need to include the assembly plugin in our plugins.sbt file.
1 | addSbtPlugin( "com.eed3si9n" % "sbt-assembly" % "0.14.3" ) |
Our sbt file is as follows
1 2 3 4 5 6 7 8 9 | name := "ScalaStorm" version := "1.0" scalaVersion := "2.12.1" scalacOptions + = "-Yresolve-term-conflict:package" libraryDependencies + = "org.apache.storm" % "storm-core" % "1.0.2" % "provided" |
And then we issue a build
1 | sbt clean compile assembly |
You can find the sourcecode on github.
On the next post we shall deploy our Storm app to HDInsight.
Reference: | WordCount with Storm and Scala from our JCG partner Emmanouil Gkatziouras at the gkatzioura blog. |