Apache Spark, Cassandra and Game of Thrones
Apache Spark with Cassandra is a powerful combination in data processing pipelines. In this post, we will build a Scala application with the Spark Cassandra combo and query battle data from Game of Thrones. Now, we’re not going to make any show predictions! But, we will show the most aggressive kings as well as kings which were attacked the most. So, you’ve got that goin for you.
Overview
Our primary focus is the technical highlights of Spark Cassandra integration with Scala. To do so, we will load up Cassandra with Game of Thrones battle data and then query it from Spark using Scala. We’ll use Spark from both a shell as well as deploying a Spark Driver program to a cluster. We’ll have examples of Scala case class marshalling courtesy of the DataStax connector as well as using SparkSQL to produce DataFrames. We’re also mix in some sbt configuration as well.
There are screencasts and relevant links at the bottom of this post in the “Resources” section.
The intended audience of this Spark Cassandra tutorial is someone with beginning to intermediate experience with Scala and Apache Spark. If you would like to reach this level quickly and efficiently, please check out our On-Demand Apache Spark with Scala Training Course.
Pre-Requisites
- Apache Cassandra (see resources below)
- Downloaded Game of Thrones data (see resources below)
- Apache Spark
Outline
- Import data into Cassandra
- Write Scala code
- Test Spark Cassandra code in SBT shell
- Deploy Spark Cassandra to Spark Cluster with SBT and
spark-submit
Spark Cassandra Example
Part 1: Prepare Cassandra
Let’s import the GOT battle data into Cassandra. To keep things simple, I’m going to use a local running Cassandra instance. I started Cassandra with bin/cassandra script on my Mac. (use cassandra.bat on Windows, but you knew that already.).
Next, connect to Cassandra with cqlsh and create a keyspace to use. This tutorial creates a “gameofthrones” keyspace:
CREATE KEYSPACE gameofthrones WITH replication = {'class': 'SimpleStrategy', 'replication_factor' : 1};
From here, we create a table for the battle data.
CREATE KEYSPACE gameofthrones WITH replication = {'class': 'SimpleStrategy', 'replication_factor' : 1}; CREATE TABLE gameofthrones.battles ( name TEXT, year INT, battle_number INT, attacker_king TEXT, defender_king TEXT, attacker_1 TEXT, attacker_2 TEXT, attacker_3 TEXT, attacker_4 TEXT, defender_1 TEXT, defender_2 TEXT, defender_3 TEXT, defender_4 TEXT, attacker_outcome TEXT, battle_type TEXT, major_death TEXT, major_capture TEXT, attacker_size TEXT, defender_size TEXT, attacker_commander TEXT, defender_commander TEXT, summer TEXT, location TEXT, region TEXT, note TEXT, PRIMARY KEY(battle_number) );
Then import the battles data using Cassandra COPY
shown below. (see Resouces section below for where to download data).
BTW, I needed to run a Perl script to update the end-of-line encodings from Mac to Unix on the CSV file using perl -pi -e 's/\r/\n/g
. Your mileage may vary.
COPY battles ( name, year, battle_number, attacker_king, defender_king, attacker_1, attacker_2, attacker_3, attacker_4, defender_1, defender_2, defender_3, defender_4, attacker_outcome, battle_type, major_death, major_capture, attacker_size, defender_size, attacker_commander, defender_commander, summer, location, region, note) FROM 'battles.csv' // update this location as necessary WITH HEADER = true;
That wraps Part 1. Let’s move on to Part 2 where we’ll write some Scala code.
Part 2: Spark Cassandra Scala Code
(Note: All of the following sample code if available from Github. Link in Resources section below.)
To begin, let’s layout the skeleton structure of the project –
mkdir got-battles // name it anything you'd like cd got-battles // if you named it got-battles mkdir src mkdir src/main mkdir src/main/scala mkdir src/main/scala/com mkdir src/main/scala/com/supergloo mkdir project
Next, we’re going to add some files for sbt
and the sbt-assembly plugin.
First the build file for sbt
got-battles/build.sbt file:
name := "spark-cassandra-example" version := "1.0" assemblyOption in assembly := (assemblyOption in assembly).value.copy(includeScala = false) // https://groups.google.com/a/lists.datastax.com/forum/#!topic/spark-connector-user/5muNwRaCJnU assemblyMergeStrategy in assembly <<= (assemblyMergeStrategy in assembly) { (old) => { case PathList("META-INF", "io.netty.versions.properties") => MergeStrategy.last case x => old(x) } } scalaVersion := "2.10.6" resolvers += "jitpack" at "https://jitpack.io" libraryDependencies ++= Seq( // use provided line when building assembly jar // "org.apache.spark" %% "spark-sql" % "1.6.1" % "provided", // comment above line and uncomment the following to run in sbt "org.apache.spark" %% "spark-sql" % "1.6.1", "com.datastax.spark" %% "spark-cassandra-connector" % "1.5.0" )
and the 1 line got-battles/project/assembly.sbt file:
addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "0.12.0")
And now let’s create the Spark driver code in got-battles/src/main/scala/com/supergloo called SparkCassandra.scala
package com.supergloo import org.apache.spark.{SparkConf, SparkContext} import org.apache.spark.sql.SQLContext import org.apache.spark.sql.functions._ import com.datastax.spark.connector._ import com.datastax.spark.connector.rdd._ import org.apache.spark.sql.cassandra._ /** * Simple Spark Cassandra * One example with Scala case class marshalling * Another example using Spark SQL */ object SparkCassandra { case class Battle( battle_number: Integer, year: Integer, attacker_king: String, defender_king: String ) def main(args: Array[String]) { val conf = new SparkConf().setAppName("SparkCassandraExampleApp") if (args.length > 0) conf.setMaster(args(0)) // in case we're running in sbt shell such as: run local[5] conf.set("spark.cassandra.connection.host", "127.0.0.1") // so yes, I'm assuming Cassandra is running locally here. // adjust as needed val sc = new SparkContext(conf) // Spark Cassandra Example one which marshalls to Scala case classes val battles:Array[Battle] = sc.cassandraTable[Battle]("gameofthrones", "battles"). select("battle_number","year","attacker_king","defender_king").toArray battles.foreach { b: Battle => println("Battle Number %s was defended by %s.".format(b.battle_number, b.defender_king)) } // Spark Cassandra Example Two - Create DataFrame from Spark SQL read val sqlContext = new SQLContext(sc) val df = sqlContext.read .format("org.apache.spark.sql.cassandra") .options(Map( "table" -> "battles", "keyspace" -> "gameofthrones" )) .load() df.show // Game of Thrones Battle analysis // Who were the most aggressive kings? (most attacker_kings) val countsByAttack = df.groupBy("attacker_king").count().sort(desc("count")) countsByAttack.show() // Which kings were attacked the most? (most defender_kings) val countsByDefend = df.groupBy("defender_king").count().sort(desc("count")) countsByDefend.show() sc.stop() } }
Part 3: Run Spark Cassandra Scala Code from SBT Console
Start up the sbt console via sbt. Once ready, you can issue the run command with an argument for your Spark Master location; i.e. run local[5]
(Again, there’s a screencast at the end of this post which shows an example of running this command. See Resources section below.)
Depending on your log level, you should see various SparkCassandra outputs from the SparkCassandra code. These console outputs from Cassandra is the indicator of success. Oh yeah. Say it with me now. Oh yeahhhhh
Running code in the sbt console is a convenient way to make and test changes rapidly. As I developed this code, there was a terminal open in one window and an editor open in another window. Whenever I made a Scala source code change and saved, I could simply re-run in the sbt console.
So now, let’s say we’ve reached the point of wanting to deploy this Spark program. Let’s find out in the next section.
Part 4: Assemble Spark Cassandra Scala Code and Deploy to Spark Cluster
To build a jar and deploy to a Spark cluster, we need to make a small change to our build.sbt
file. As you may have noticed from the code above, there are comments in the file which indicate what needs to be changed. We should uncomment this line:
// "org.apache.spark" %% "spark-sql" % "1.6.1" % "provided",
and comment out this line:
"org.apache.spark" %% "spark-sql" % "1.6.1",
then, we can run sbt assembly
from command-line to produce a Spark deployable jar. If you use the sample build.sbt
file this will produce target/scala-2.10/spark-cassandra-example-assembly-1.0.jar
To deploy, use spark-submit
with the appropriate arguments; i.e.
spark-submit --class "com.supergloo.SparkCassandra" --master spark://todd-mcgraths-macbook-pro.local:7077 ./target/scala-2.10/spark-cassandra-example-assembly-1.0.jarspark-submit --class "com.supergloo.SparkCassandra" --master spark://todd-mcgraths-macbook-pro.local:7077 ./target/scala-2.10/spark-cassandra-example-assembly-1.0.jar
Conclusion
So, what do you think? When you run the code, you can see the most aggressive kings and the kings which were attacked the most. Without giving it away, I think one could argue whether Mance Rayder should be tied with Renly Baratheon on the most attacked list. But, that’s not really the point of this tutorial. As for the code and setup, do you have any questions, opinions or suggestions for next steps?
Spark Cassandra Tutorial Screencast
In the following screencast, I run through the steps described in this tutorial. Stay tuned because there is blooper footage at the end of the screencast. Because I mean, why not bloopers.
Spark Cassandra Tutorial Resources
- All source code including the battles.csv file I scrubbed using the perl script described above at Apache Spark Cassandra Example code
- https://github.com/datastax/spark-cassandra-connector
- DataFrames with Cassandra Connector
- Game of Thrones Data: https://github.com/chrisalbon/war_of_the_five_kings_dataset
Reference: | Apache Spark, Cassandra and Game of Thrones from our JCG partner Todd McGrath at the Supergloo blog. |