Scala

Spark Streaming Testing with Scala Example

Spark Streaming Testing

How do you create and automate tests of Spark Streaming applications?  In this post, we’ll show an example of one way in Scala.  This post is heavy on code examples and has the added bonus of using a code coverage plugin.

Are the tests in this tutorial examples unit tests?  Or, are they integration tests?  Functional tests?   I don’t know, you tell me in the comments below if you have an opinion.  If I had to choose, I’d say unit tests because we are stubbing the streaming provider.

Pre-requisites

As I’m sure you can guess, you will need some Spark Streaming Scala code to test.  We’re going to use our Spark Streaming example from Slack code in this post.  So, check that out first if you need some streaming scala code to use.  It’s not required to use that code though.  You should be able to get the concepts presented and apply to your own code if desired.  All the testing code and Spark streaming example code is available to pull from Github anyhow.

We’re going to use sbt to build and run tests and create coverage reports.  So, if you are not using sbt please translate to your build tool accordingly.

Overview

In order to write automated tests for Spark Streaming, we’re going to use a third party library called scalatest.  Also, we’re going to add an sbt plugin called “sbt-coverage”.  Then, with these tools in hand, we can write some Scala test code and create test coverage reports.

Steps

  1. Pull Spark Streaming code example from github
  2. Describe Updates to build.sbt
  3. Create project/plugins.sbt
  4. Write Scala code
  5. Execute tests and coverage reports

Pull Spark Streaming Code Example from Github

If you don’t want to copy-and-paste code, you can pull it from github.  Just pull the spark-course repo from https://github.com/tmcgrath/spark-course and the project we are working from is in the spark-streaming-tests directory.

Updates to Previous build.sbt

build.sbt should be updated to include a new command alias as well as the scalatest 3rd party lib as seen below:

Spark Streaming tests build.sbt update

01
02
03
04
05
06
07
08
09
10
11
12
13
scalaVersion := "2.11.8"
   
  +addCommandAlias("sanity", ";clean ;compile ;coverage ;test; coverageReport")
   
  resolvers += "jitpack" at "https://jitpack.io"
 @@ -19,5 +21,6 @@ libraryDependencies ++= Seq(
  // comment above line and uncomment the following to run in sbt
  // "org.apache.spark" %% "spark-streaming" % "1.6.1",
    "org.scalaj" %% "scalaj-http" % "2.3.0",
 "org.jfarcand" % "wcs" % "1.5"
 "org.jfarcand" % "wcs" % "1.5",
 "org.scalatest" %% "scalatest" % "2.2.6" % "test"
  )

Notice how we add “test” to the end of the libraryDependencies sequence to indicate the library is only needed for tests.

Create project/plugins.sbt

Add a new line for the sbt-coverage plugin as seen here:

SBT code coverage plugin

1
addSbtPlugin("org.scoverage" % "sbt-scoverage" % "1.3.5")

Write Scala Tests

Actually, before we write the actual tests, we’re going to update our previous SlackStreamingApp’s main method to facilitate automated tests.  I know, I know, if we would have written SlackStreamingApp with TDD, then we wouldn’t have to do this, right?

Anyhow, it’s not a huge change.

Spark Streaming Scala tests

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
object SlackStreamingApp {
 -
 +
    def main(args: Array[String]) {
      val conf = new SparkConf().setMaster(args(0)).setAppName("SlackStreaming")
      val ssc = new StreamingContext(conf, Seconds(5))
      val stream = ssc.receiverStream(new SlackReceiver(args(1)))
      stream.print()
 -    if (args.length > 2) {
 -      stream.saveAsTextFiles(args(2))
 -    }
 +
 +    processStream(args, stream)
 +
      ssc.start()
      ssc.awaitTermination()
    }
 -
 +
 +  def processStream(args: Array[String], stream: DStream[String]): Unit = {
 +    args match {
 +      case Array(_, _, path, _*) => stream.saveAsTextFiles(args(2))
 +      case _ => return
 +    }
 +
 +
 +  }
 +

As you can hopefully see, we just needed to extract the code looking for a command-line arg into a new function called processStream.  Also, we need to add one more line to the imports at the top

Spark Scala DStream import

1
import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}

Next, we write the testing code.  To start, we need to create new directories to store the test code.  Create src/test/scala/com/supergloo directories.  Next, we add test code to this directory by creating the following Scala file: src/test/scala/com/supergloo/SlackStreamingTest.scala

Spark Streaming Scalatest

001
002
003
004
005
006
007
008
009
010
011
012
013
014
015
016
017
018
019
020
021
022
023
024
025
026
027
028
029
030
031
032
033
034
035
036
037
038
039
040
041
042
043
044
045
046
047
048
049
050
051
052
053
054
055
056
057
058
059
060
061
062
063
064
065
066
067
068
069
070
071
072
073
074
075
076
077
078
079
080
081
082
083
084
085
086
087
088
089
090
091
092
093
094
095
096
097
098
099
100
101
102
103
104
105
106
package com.supergloo
  
import com.supergloo.SlackStreamingApp._
import org.apache.hadoop.mapred.InvalidInputException
import org.apache.spark.SparkConf
import org.apache.spark.rdd.RDD
import org.apache.spark.streaming.{ClockWrapper, Seconds, StreamingContext}
import org.scalatest.concurrent.Eventually
import org.scalatest.{BeforeAndAfter, FlatSpec, Matchers}
  
import scala.collection.mutable
import scala.concurrent.duration._
import scala.language.postfixOps
import scala.reflect.io.Path
import scala.util.Try
  
class SlackStreamingTest extends FlatSpec with Matchers with Eventually with BeforeAndAfter {
  
  private val master = "local[1]"
  private val appName = "spark-streaming-test"
  private val filePath: String = "target/testfile"
  
  private var ssc: StreamingContext = _
  
  private val batchDuration = Seconds(1)
  
  var clock: ClockWrapper = _
  
  before {
    val conf = new SparkConf()
      .setMaster(master).setAppName(appName)
      .set("spark.streaming.clock", "org.apache.spark.streaming.util.ManualClock")
  
    ssc = new StreamingContext(conf, batchDuration)
    clock = new ClockWrapper(ssc)
  }
  
  after {
    if (ssc != null) {
      ssc.stop()
    }
    Try(Path(filePath + "-1000").deleteRecursively)
  }
  
  "Slack Streaming App " should " store streams into a file" in {
    val lines = mutable.Queue[RDD[String]]()
    val dstream = ssc.queueStream(lines)
  
    dstream.print()
    processStream(Array("", "", filePath), dstream)
  
  
    ssc.start()
  
    lines += ssc.sparkContext.makeRDD(Seq("b", "c"))
    clock.advance(1000)
  
    eventually(timeout(2 seconds)){
      val wFile: RDD[String] = ssc.sparkContext.textFile(filePath+ "-1000")
      wFile.count() should be (2)
      wFile.collect().foreach(println)
    }
  
  }
  
  "Slack Streaming App " should " store empty streams if no data received" in {
    val lines = mutable.Queue[RDD[String]]()
    val dstream = ssc.queueStream(lines)
  
    dstream.print()
    processStream(Array("", "", filePath), dstream)
  
  
    ssc.start()
  
    clock.advance(1000)
  
    eventually(timeout(1 seconds)){
      val wFile: RDD[String] = ssc.sparkContext.textFile(filePath+ "-1000")
      wFile.count() should be (0)
      wFile.collect().foreach(println)
    }
  
  }
  
  "Slack Streaming App " should " not store streams if argument is not passed" in {
    val lines = mutable.Queue[RDD[String]]()
    val dstream = ssc.queueStream(lines)
  
    dstream.print()
    processStream(Array("", ""), dstream)
  
    val wFile: RDD[String] = ssc.sparkContext.textFile(filePath+ "-1000")
  
    ssc.start()
  
    lines += ssc.sparkContext.makeRDD(Seq("b", "c"))
    clock.advance(2000)
  
    eventually(timeout(3 seconds)){
      a [InvalidInputException] should be thrownBy {
        wFile.count() should be (0)
      }
    }
  }
}

Next, we need to create addition directories and add ClockWrapper.scala to src/test/scala/org/apache/spark/streaming/.  More on this class later.

Spark Streaming ClockWrapper

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
package org.apache.spark.streaming
  
import org.apache.spark.util.ManualClock
  
/**
  * This class is defined in this package as the ManualClock is
  * private in the "spark" package
  */
class ClockWrapper(ssc: StreamingContext) {
  
  def getTimeMillis(): Long = manualClock().getTimeMillis()
  
  def setTime(timeToSet: Long) = manualClock().setTime(timeToSet)
  
  def advance(timeToAdd: Long) = manualClock().advance(timeToAdd)
  
  def waitTillTime(targetTime: Long): Long = manualClock().waitTillTime(targetTime)
  
  private def manualClock(): ManualClock = {
    ssc.scheduler.clock.asInstanceOf[ManualClock]
  }
}

(By the way, ClockWrapper is taken from an approach I saw on Spark unit testing.  See “Additional Resouces” section below for link.)

Ok, we’re ready to execute now.

Execute Scala tests and coverage reports

In the spark-streaming-tests directory, we can now issue sbt sanity from command-line.  You should see all three tests pass:

Spark Streaming Scalatest results

01
02
03
04
05
06
07
08
09
10
11
12
[info] SlackStreamingTest:
[info] Slack Streaming App
[info] - should store streams into a file
[info] Slack Streaming App
[info] - should store empty streams if no data received
[info] Slack Streaming App
[info] - should not store streams if argument is not passed
[info] Run completed in 4 seconds, 436 milliseconds.
[info] Total number of tests run: 3
[info] Suites: completed 1, aborted 0
[info] Tests: succeeded 3, failed 0, canceled 0, ignored 0, pending 0
[info] All tests passed.

To review coverage reports, simply open target/scala-2.11/scoverage-report/index.html in a browser.

Conclusion

Hopefully, this Spark Streaming unit test example helps start your Spark Streaming testing approach.  We covered a code example, how to run and viewing the test coverage results.  If you have any questions or comments, let me know.  Also, subscribe to the Supergloo YouTube channel for an upcoming screencast from this post.

Additional Resources

Featured image credit https://flic.kr/p/dgSbYM

Reference: Spark Streaming Testing with Scala Example from our JCG partner Todd McGrath at the Supergloo blog.

Todd McGrath

Todd is a consultant in data engineering and software development using Scala, Apache Spark, Scala, Groovy, Python, relational, columnar and noSQL databases. He is a 20-year software veteran and founder of supergloo, inc.
Subscribe
Notify of
guest


This site uses Akismet to reduce spam. Learn how your comment data is processed.

0 Comments
Oldest
Newest Most Voted
Inline Feedbacks
View all comments
Back to top button