WordCount with Storm and Scala
Apache Storm is a free and open source distributed realtime computation system running on the JVM. To get started we will implement a very simple example. Previously we implemented a word count hadoop job using scala and we uploaded it to hdinsight.
We will focus on the same word count concept but for real time cases and implement a word count topology utilizing apache storm. Our source code will be based on the official storm examples.
Storm works with spouts and bolts.
First We shall implement a spout which will emit fake data events. In our case sentences.
package com.gkatzioura.scala.storm import org.apache.storm.spout.SpoutOutputCollector import org.apache.storm.task.TopologyContext import org.apache.storm.topology.OutputFieldsDeclarer import org.apache.storm.topology.base.BaseRichSpout import org.apache.storm.tuple.{Fields, Values} import org.apache.storm.utils.Utils import scala.util.Random /** * Created by gkatzioura on 2/17/17. */ class RandomSentenceSpout extends BaseRichSpout { var _collector:SpoutOutputCollector = _ var _rand:Random = _ override def nextTuple(): Unit = { Utils.sleep(100) val sentences = Array("the cow jumped over the moon","an apple a day keeps the doctor away", "four score and seven years ago","snow white and the seven dwarfs","i am at two with nature") val sentence = sentences(_rand.nextInt(sentences.length)) _collector.emit(new Values(sentence)) } override def open(conf: java.util.Map[_, _], context: TopologyContext, collector: SpoutOutputCollector): Unit = { _collector = collector _rand = Random } override def declareOutputFields(declarer: OutputFieldsDeclarer): Unit = { declarer.declare(new Fields("word")) } }
Next step is to implement a bolt which splits the sentences and emits them.
package com.gkatzioura.scala.storm import java.text.BreakIterator import org.apache.storm.topology.{BasicOutputCollector, OutputFieldsDeclarer} import org.apache.storm.topology.base.BaseBasicBolt import org.apache.storm.tuple.{Fields, Tuple, Values} /** * Created by gkatzioura on 2/18/17. */ class SplitSentenceBolt extends BaseBasicBolt { override def execute(input: Tuple, collector: BasicOutputCollector): Unit = { val sentence = input.getString(0) val boundary = BreakIterator.getWordInstance boundary.setText(sentence) var start = boundary.first var end:Int = start while(end!=BreakIterator.DONE) { end = boundary.next val word = sentence.substring(start,end).replaceAll("\\s+","") start = end if(!word.equals("")) { collector.emit(new Values(word)) } } } override def declareOutputFields(declarer: OutputFieldsDeclarer): Unit = { declarer.declare(new Fields("word")) } }
And the last step is the word count bolt.
package com.gkatzioura.scala.storm import org.apache.storm.topology.{BasicOutputCollector, OutputFieldsDeclarer} import org.apache.storm.topology.base.BaseBasicBolt import org.apache.storm.tuple.{Fields, Tuple, Values} /** * Created by gkatzioura on 2/18/17. */ class WordCountBolt extends BaseBasicBolt{ val counts = scala.collection.mutable.Map[String,Int]() override def execute(input: Tuple, collector: BasicOutputCollector): Unit = { val word = input.getString(0) val optCount = counts.get(word) if(optCount.isEmpty) { counts.put(word,1) } else { counts.put(word,optCount.get+1) } collector.emit(new Values(word,counts)) } override def declareOutputFields(declarer: OutputFieldsDeclarer): Unit = { declarer.declare(new Fields("word","count")); } }
The final step is to create our topology which takes care whether we run locally or in a cluster environment.
package com.gkatzioura.scala.storm import org.apache.storm.{Config, LocalCluster, StormSubmitter} import org.apache.storm.topology.TopologyBuilder import org.apache.storm.tuple.Fields /** * Created by gkatzioura on 2/18/17. */ object WordCountTopology { def main(args: Array[String]): Unit = { println("Hello, world!") val builder = new TopologyBuilder builder.setSpout("spout", new RandomSentenceSpout, 5) builder.setBolt("split", new SplitSentenceBolt, 8).shuffleGrouping("spout") builder.setBolt("count", new WordCountBolt, 12).fieldsGrouping("split", new Fields("word")) val conf = new Config() conf.setDebug(true) if (args != null && args.length > 0) { conf.setNumWorkers(3) StormSubmitter.submitTopology(args(0), conf, builder.createTopology()) } else { conf.setMaxTaskParallelism(3) val cluster = new LocalCluster cluster.submitTopology("word-count", conf, builder.createTopology()) Thread.sleep(10000) cluster.shutdown() } } }
Now we shall build our app. To do so we need to include the assembly plugin in our plugins.sbt file.
addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "0.14.3")
Our sbt file is as follows
name := "ScalaStorm" version := "1.0" scalaVersion := "2.12.1" scalacOptions += "-Yresolve-term-conflict:package" libraryDependencies += "org.apache.storm" % "storm-core" % "1.0.2" % "provided"
And then we issue a build
sbt clean compile assembly
You can find the sourcecode on github.
On the next post we shall deploy our Storm app to HDInsight.
Reference: | WordCount with Storm and Scala from our JCG partner Emmanouil Gkatziouras at the gkatzioura blog. |