Reactive Database Access – Part 3 – Using jOOQ with Scala, Futures and Actors
We’re very happy to continue our a guest post series on the jOOQ blog by Manuel Bernhardt. In this blog series, Manuel will explain the motivation behind so-called reactive technologies and after introducing the concepts of Futures and Actors use them in order to access a relational database in combination with jOOQ.
Manuel Bernhardt is an independent software consultant with a passion for building web-based systems, both back-end and front-end. He is the author of “Reactive Web Applications” (Manning) and he started working with Scala, Akka and the Play Framework in 2010 after spending a long time with Java. He lives in Vienna, where he is co-organiser of the local Scala User Group. He is enthusiastic about the Scala-based technologies and the vibrant community and is looking for ways to spread its usage in the industry. He’s also scuba-diving since age 6, and can’t quite get used to the lack of sea in Austria.
This series is split in three parts, which we have published over the past months:
- Part 1: Why Reactive, why “Async” & an introduction to Futures
- Part 2: Introduction to Actors
- Part 3: Using jOOQ with Scala, Futures and Actors
Introduction
In the previous two posts of this series we have introduced the benefits of reactive programming as well as two tools available for manipulating them, Futures and Actors. Now it is time to get your hands dirty, dear reader, and to create a simple application featuring reactive database access. Fear not, I will be there along the whole way to guide you through the process.
- Also, the source code of this example is available on Github
Getting the tools
In order to build the application, you will need a few tools. If you haven’t worked with Scala yet, the simplest for you may be to go and grab the Typesafe Activator which is a standalone project that brings in the necessary tools to build a Scala project from scratch.
Since this is about reactive database access, you will also need a database. For the purpose of this simple example, we’re going to use Oracle Database 12c Enterprise Edition. Nah, just kidding – it might be a bit cumbersome to get this one to run on your machine. Instead we will use the excellent PostgreSQL. Make sure to install it so that you can run the psql
utility from your console.
Ready? Great! Let’s have a look at what we’re going to build.
The application
The goal of our application is to fetch mentions from Twitter and store them locally so as to be able to visualize them and perform analytics on them.
The core of this mechanism will be a MentionsFetcher
actor which will periodically fetch mentions from Twitter and save them in the database. Once there we can display useful information on a view.
Creating the database
The first step we’re going to take is to create the database. Create a mentions.sql
file somewhere with the following content:
CREATE USER "play" NOSUPERUSER INHERIT CREATEROLE; CREATE DATABASE mentions WITH OWNER = "play" ENCODING 'UTF8'; GRANT ALL PRIVILEGES ON DATABASE mentions to "play"; \connect mentions play CREATE TABLE twitter_user ( id bigserial primary key, created_on timestamp with time zone NOT NULL, twitter_user_name varchar NOT NULL ); CREATE TABLE mentions ( id bigserial primary key, tweet_id varchar NOT NULL, user_id bigint NOT NULL, created_on timestamp with time zone NOT NULL, text varchar NOT NULL );
This script will create a play
user, a mentions
database as well as two tables, twitter_user
and mentions
.
In order to execute it, execute the following command in a terminal:
psql -f mentions.sql
(note: you might need to explictly declare which user runs this command, depending on how you have configured PostgreSQL to run)
Bootstrapping the project
Let’s create the reactive-mentions
project, shall we? Assuming that you have installed the activator, run the following command in a terminal:
~/workspace » activator new reactive-mentions
This will prompt a list of templates, we are going to use the play-scala
project template:
Fetching the latest list of templates... Browse the list of templates: http://typesafe.com/activator/templates Choose from these featured templates or enter a template name: 1) minimal-akka-java-seed 2) minimal-akka-scala-seed 3) minimal-java 4) minimal-scala 5) play-java 6) play-scala (hit tab to see a list of all templates) > 6 OK, application "reactive-mentions" is being created using the "play-scala" template. [...]
At this point, a simple Play Framework project has been created in the reactive-mentions
directory. If you want to, you can run this project by navigating to it and running the command activator run
.
In order to work on the project, you can use one of the many IDEs that have Scala support. My personal favourite is to this day IntelliJ IDEA which does a pretty good job at this and also has built-in support for the Play Framework itself.
Setting up jOOQ
I wrote about database access in Scala about 2 years ago. There are to this day still quite a few alternatives to relational database access in Scala but at least personally I have now reached the conclusion that for the type of projects I work on, jOOQ beats them all when it comes to writing type-safe SQL. So without further ado let’s integrate it with our project.
There is an SBT plugin available for this if you would like, however for this application we will settle for a minimal, hand-crafter solution.
Bring up the build.sbt
file in an editor and add adjust the libraryDependencies
to look like so:
libraryDependencies ++= Seq( jdbc, cache, ws, "org.postgresql" % "postgresql" % "9.4-1201-jdbc41", "org.jooq" % "jooq" % "3.7.0", "org.jooq" % "jooq-codegen-maven" % "3.7.0", "org.jooq" % "jooq-meta" % "3.7.0", specs2 % Test )
If you are running the project’s console (which you can do by executing the activator
command in the project’s directory) you will need to call the reload command in order for the new dependencies to be pulled in. This is true of any change you are doing to the build.sbt
file. Don’t forget about it in the remainder of this article!
(note: make sure to use the version of the PostgreSQL driver that fits your version of PostgreSQL!)
Next, we need to set up jOOQ itself. For this purpose, create the file conf/mentions.xml
, where conf
is the directory used in the Play Framework for storing configuration-related files:
<?xml version="1.0" encoding="UTF-8" standalone="yes"?> <configuration xmlns="http://www.jooq.org/xsd/jooq-codegen-3.7.0.xsd"> <jdbc> <driver>org.postgresql.Driver</driver> <url>jdbc:postgresql://localhost/mentions</url> <user>play</user> <password></password> </jdbc> <generator> <name>org.jooq.util.ScalaGenerator</name> <database> <name>org.jooq.util.postgres.PostgresDatabase</name> <inputSchema>public</inputSchema> <includes>.*</includes> <excludes></excludes> </database> <target> <packageName>generated</packageName> <directory>target/scala-2.11/src_managed/main</directory> </target> </generator> </configuration>
This configuration will allow to run jOOQ’s ScalaGenerator which will read the database schema and generate Scala specific classes for it, storing them in a directory accessible in the classpath and meant for generated sources.
All that is left to do is to create have a way to run jOOQ’s code generation. A simple solution that we are going to use is to create a custom SBT task in our project build. Go back to build.sbt
and add the following at the end:
val generateJOOQ = taskKey[Seq[File]]("Generate JooQ classes") val generateJOOQTask = (sourceManaged, fullClasspath in Compile, runner in Compile, streams) map { (src, cp, r, s) => toError(r.run("org.jooq.util.GenerationTool", cp.files, Array("conf/mentions.xml"), s.log)) ((src / "main/generated") ** "*.scala").get } generateJOOQ <<= generateJOOQTask unmanagedSourceDirectories in Compile += sourceManaged.value / "main/generated"
The generateJOOQ
task will run the GenerationTool
using the mentions.xml
file we have set-up earlier on. Let’s run it!
Start the SBT console by running the activator
command in your terminal window, in the reactive-streams
directory, and then run the generateJOOQ
command:
[reactive-mentions] $ generateJOOQ [info] Running org.jooq.util.GenerationTool conf/mentions.xml [success] Total time: 1 s, completed Dec 11, 2015 2:55:08 PM
That’s it! If you want a bit more verbosity, add the following logger configuration to conf/logback.xml
:
<logger name="org.jooq" level="INFO" />
Alright, we are now ready to get to the core of our endaveour: create the actor that will pull mentions from Twitter!
Creating the MentionsFetcher actor
For the purpose of fetching mentions at regular intervals from Twitter, we will be using a simple Akka actor. Actors are meant to do a lot more powerful things than this but for the sake of introducing the concept this example will do (or so I hope).
Go ahead and add Akka as well as its logging facility as library dependencies in build.sbt
:
libraryDependencies ++= Seq( ... "com.typesafe.akka" %% "akka-actor" % "2.4.1", "com.typesafe.akka" %% "akka-slf4j" % "2.4.1" )
Next, create the file app/actors/MentionsFetcher.scala
with the following content:
package actors import actors.MentionsFetcher._ import akka.actor.{ActorLogging, Actor} import org.joda.time.DateTime import scala.concurrent.duration._ class MentionsFetcher extends Actor with ActorLogging { val scheduler = context.system.scheduler.schedule( initialDelay = 5.seconds, interval = 10.minutes, receiver = self, message = CheckMentions ) override def postStop(): Unit = { scheduler.cancel() } def receive = { case CheckMentions => checkMentions case MentionsReceived(mentions) => storeMentions(mentions) } def checkMentions = ??? def storeMentions(mentions: Seq[Mention]) = ??? } object MentionsFetcher { case object CheckMentions case class Mention(id: String, created_at: DateTime, text: String, from: String, users: Seq[User]) case class User(handle: String, id: String) case class MentionsReceived(mentions: Seq[Mention]) }
The first thing you may notice from this code is the unimplemented methods fetchMentions
and storeMentions
with the triple question mark ???
. That’s actually valid Scala syntax: it is a method available by default which throws ascala.NotImplementedError
.
The second thing I want you to notice is the companion object to the MentionsFetcher
class which holds the protocol of our actor. Actors communicate using messages and even though our actor will only communicate with itself in this example it is a good idea to place it in a companion object and to import its members (via the wildcard import import actors.MentionsFetcher._
) so as to keep things organized as the project grows.
Other than this, what we are doing for the moment is quite simple: we are setting up a scheduler that wakes up every 10 minutes in order to send the actor it-self
the FetchMentions
message. Upon receiving this message in the main receive
method we are going to proceed to fetching the mentions from Twitter. Finally when a MentionsReceived
message is received, we simply invoke the storeMentions
method.
Simple enough, isn’t it? Don’t worry, things are about to get a little bit more complicated.
Fetching the mentions from Twitter
Twitter does not have an API that lets us directly fetch recent mentions. However it has an API that lets us search for Tweets and that will have to do.
Before you can go any further, if you intend to run this project, you will need to get yourself a set of keys and access tokens at apps.twitter.com. If you don’t you will have to trust me that the following works.
Once you have them, add them in the file conf/application.conf
like so:
# Twitter twitter.apiKey="..." twitter.apiSecret="..." twitter.accessToken="..." twitter.accessTokenSecret="..."
Then, create the credentials
method in MentionsFetcher
:
// ... import play.api.Play import play.api.Play.current import play.api.libs.oauth.{RequestToken, ConsumerKey} class MentionsFetcher extends Actor with ActorLogging { // ... def credentials = for { apiKey <- Play.configuration.getString("twitter.apiKey") apiSecret <- Play.configuration.getString("twitter.apiSecret") token <- Play.configuration.getString("twitter.accessToken") tokenSecret <- Play.configuration.getString("twitter.accessTokenSecret") } yield (ConsumerKey(apiKey, apiSecret), RequestToken(token, tokenSecret)) }
This will allow us to place a call to Twitter’s API using the correct OAuth credentials.
Next, let’s get ready to fetch those mentions:
// ... import akka.pattern.pipe import org.joda.time.DateTime import scala.util.control.NonFatal class MentionsFetcher extends Actor with ActorLogging { // ... var lastSeenMentionTime: Option[DateTime] = Some(DateTime.now) def checkMentions = { val maybeMentions = for { (consumerKey, requestToken) <- credentials time <- lastSeenMentionTime } yield fetchMentions(consumerKey, requestToken, "<yourTwitterHandleHere>", time) maybeMentions.foreach { mentions => mentions.map { m => MentionsReceived(m) } recover { case NonFatal(t) => log.error(t, "Could not fetch mentions") MentionsReceived(Seq.empty) } pipeTo self } } def fetchMentions(consumerKey: ConsumerKey, requestToken: RequestToken, user: String, time: DateTime): Future[Seq[Mention]] = ???
Do you remember the pipe pattern we talked about in the previous post about Actors? Well, here it is again!
The call we are going to make against Twitter’s API is going to be asynchronous. In other words we will not simply get aSeq[Mention]
but a Future[Seq[Mention]]
to work with, and the best way to deal with that one is to send ourselves a message once the Future has completed with the contents of the result.
Since things can go wrong though we also need to think about error recovery which we do here by heroically logging out the fact that we could not fetch the mentions.
You may also notice that we have introduced a lastSeenMentionTime
variable. This is the means by which we are going to keep in memory the timestamp of the last mention we have seen.
In order to go ahead, one thing we need to do is to use a more recent version of the async-http-library
client since there is a bug in Play 2.4.x. Add the following dependency to build.sbt
:
libraryDependencies ++= Seq( ... "com.ning" % "async-http-client" % "1.9.29" )
Alright, now that we are all set, let’s finally fetch those mentions!
// ... import scala.util.control.NonFatal import org.joda.time.format.DateTimeFormat import play.api.libs.json.JsArray import play.api.libs.ws.WS import scala.concurrent.Future class MentionsFetcher extends Actor with ActorLogging { // ... def fetchMentions(consumerKey: ConsumerKey, requestToken: RequestToken, user: String, time: DateTime): Future[Seq[Mention]] = { val df = DateTimeFormat.forPattern("EEE MMM dd HH:mm:ss Z yyyy").withLocale(Locale.ENGLISH) WS.url("https://api.twitter.com/1.1/search/tweets.json") .sign(OAuthCalculator(consumerKey, requestToken)) .withQueryString("q" -> s"@$user") .get() .map { response => val mentions = (response.json \ "statuses").as[JsArray].value.map { status => val id = (status \ "id_str").as[String] val text = (status \ "text").as[String] val from = (status \ "user" \ "screen_name").as[String] val created_at = df.parseDateTime((status \ "created_at").as[String]) val userMentions = (status \ "entities" \ "user_mentions").as[JsArray].value.map { user => User((user \ "screen_name").as[String], ((user \ "id_str").as[String])) } Mention(id, created_at, text, from, userMentions) } mentions.filter(_.created_at.isAfter(time)) } } }
Fetching the mentions is rather straightforward thanks to Play’s WebService library. We create a signed OAuth request using our credentials and run a HTTP GET
request against the search API passing as query string the @userName
which will (hopefully) give us a list of Tweets mentioning a user. Lastly we do only keep those mentions that are after our last check time. Since we check every 10 minutes and since the API only returns recent tweets, this should be doing fine (unless you are very popular on Twitter and get an insane amount of replies – but this really is your own fault, then).
Setting the ExecutionContext
If you try to compile the project now (using the compile
command) you will be greeted with a few compilation errors complaining about a missing ExecutionContext
. Futures are a way to abstract tasks and they need something to run them. The ExecutionContext
is the missing bit which will schedule the tasks to be executed.
Since we are inside of an actor we can borrow the actor’s own dispatcher:
class MentionsFetcher extends Actor with ActorLogging { implicit val executionContext = context.dispatcher // ... }
We’ll talk more about Execution Contexts later on when it comes to fine-tuning the connection with the database. For the moment let us focus on actually getting to talk with the database at all.
Setting up a reactive database connection
Configuring the database connection
In order to connect to the database, we will first need to configure the connection information in conf/application.conf
like so:
// ... db.default.driver="org.postgresql.Driver" db.default.url="jdbc:postgresql://localhost/mentions?user=play"
Creating a helper class to access the database
Play’s Database
API is letting us access the configured database. We now need to do two things:
- use jOOQ (rather than plain JDBC) to talk with the database
- make sure we are not going to jeopardize our application by blocking while waiting for the database interaction to happen (JDBC is blocking)
For this purpose we will wrap the database operations in a Future
that will run on its own ExecutionContext rather than sharing the one used by the actor or by the Play application itself.
Go ahead and create the file app/database/DB.scala
:
package database import javax.inject.Inject import akka.actor.ActorSystem import org.jooq.{SQLDialect, DSLContext} import org.jooq.impl.DSL import play.api.db.Database import scala.concurrent.{ExecutionContext, Future} class DB @Inject() (db: Database, system: ActorSystem) { val databaseContext: ExecutionContext = system.dispatchers.lookup("contexts.database") def query[A](block: DSLContext => A): Future[A] = Future { db.withConnection { connection => val sql = DSL.using(connection, SQLDialect.POSTGRES_9_4) block(sql) } }(databaseContext) def withTransaction[A](block: DSLContext => A): Future[A] = Future { db.withTransaction { connection => val sql = DSL.using(connection, SQLDialect.POSTGRES_9_4) block(sql) } }(databaseContext) }
We define two methods, query
and withTransaction
that:
- use a
Future
block in order to wrap the underlying code as aFuture
, thus running it asynchronously - use a custom
databaseContext
ExecutionContext
in order to execute this Future - initialze jOOQ’s
DSLContext
and give access to it in the body of the expected functions
The databaseContext
ExectionContext
is created using Akka’s configuration capabilities. We need to add the configuration of the database
dispatcher in conf/application.conf
:
contexts { database { fork-join-executor { parallelism-max = 9 } } }
The magic number 9 doesn’t come out of nowhere. Check the excellent explanation provided by the HikariCP connection pool about connection pool sizing for more details. Those considerations are also discussed in length in Chapters 5 and 7 of Reactive Web Applications.
Wiring everything using dependency injection
Next, let’s use Play’s built-in dependency injection mechanism in order to provide our MentionsFetcher
actor with a DB
class. Adjust the constructor of our MentionsFetcher
actor in app/actors/MentionsFetcher.scala
to look like so:
// ... import javax.inject.Inject import play.api.db.Database class MentionsFetcher @Inject() (database: Database) extends Actor with ActorLogging { ... }
We just need one more thing in order to bootstrap our MentionsFetcher
actor: let Play know that we want to use it.
For this purpose we will declare a module and leverage the plumbing that Play provides when it comes to interacting with Akka actors. At the end of MentionsFetcher.scala
(or in a new file, if you like), declare the following MentionsFetcherModule
:
import com.google.inject.AbstractModule import play.api.libs.concurrent.AkkaGuiceSupport class MentionsFetcherModule extends AbstractModule with AkkaGuiceSupport { def configure(): Unit = bindActor[MentionsFetcher]("fetcher") }
Last but not least we need to tell Play that we would like to use this module. In conf/appliction.conf
add the following line to do so:
play.modules.enabled += "actors.MentionsFetcherModule"
That’s it! When Play starts up it will initialize the enabled modules which in turn will lead to the actor being initialized.
We now can go ahead and use the database in order to store the fetched mentions.
Storing the mentions in the database
Thanks to jOOQ writing the statements for storing the mentions is rather easy. Since we do not want to risk storing users or mentions twice we will upsert them using the WHERE NOT EXISTS SQL clause. For the sake of recording as much data as possible we will also store all mentioned users of a Tweet.
// ... import generated.Tables._ import org.jooq.impl.DSL._ class MentionsFetcher @Inject() (db: DB) extends Actor with ActorLogging { // ... def storeMentions(mentions: Seq[Mention]) = db.withTransaction { sql => log.info("Inserting potentially {} mentions into the database", mentions.size) val now = new Timestamp(DateTime.now.getMillis) def upsertUser(handle: String) = { sql.insertInto(TWITTER_USER, TWITTER_USER.CREATED_ON, TWITTER_USER.TWITTER_USER_NAME) .select( select(value(now), value(handle)) .whereNotExists( selectOne() .from(TWITTER_USER) .where(TWITTER_USER.TWITTER_USER_NAME.equal(handle)) ) ) .execute() } mentions.foreach { mention => // upsert the mentioning users upsertUser(mention.from) // upsert the mentioned users mention.users.foreach { user => upsertUser(user.handle) } // upsert the mention sql.insertInto(MENTIONS, MENTIONS.CREATED_ON, MENTIONS.TEXT, MENTIONS.TWEET_ID, MENTIONS.USER_ID) .select( select( value(now), value(mention.text), value(mention.id), TWITTER_USER.ID ) .from(TWITTER_USER) .where(TWITTER_USER.TWITTER_USER_NAME.equal(mention.from)) .andNotExists( selectOne() .from(MENTIONS) .where(MENTIONS.TWEET_ID.equal(mention.id)) ) ) .execute() } } }
Et voilà! If you execute this code (and generate some mentions, or use an earlier timestamp for filtering) you will get some data into your database!
Let’s now query and display a few statistics in the browser.
Displaying the mentions
In order to show our mentions we will adjust the default view shown when launching the application as well as theApplication
controller. Start by adjusting the template app/views/index.scala.html
to look as follows:
@(mentionsCount: Int) @main("Reactive mentions") { <p>You have been mentioned @mentionsCount times in the past days</p> }
Next, edit the Application
controller located in app/controllers/Application.scala
:
package controllers import java.sql.Timestamp import javax.inject.Inject import database.DB import org.joda.time.DateTime import play.api._ import play.api.mvc._ class Application @Inject() (db: DB) extends Controller { def index = Action.async { implicit request => import generated.Tables._ import org.jooq.impl.DSL._ db.query { sql => val mentionsCount = sql.select( count() ).from(MENTIONS) .where( MENTIONS.CREATED_ON.gt(value(new Timestamp(DateTime.now.minusDays(1).getMillis))) ).execute() Ok(views.html.index(mentionsCount)) } } }
This time, we are using the query
method that we have built in our DB
helper. Since the result of this operation is a Future, we need to use the Action.async
method of the Action
which has a signature of the kind Request => Future[Response]
. The execution of this query is performed by the custom ExecutionContext that we have set up for database operations and does not impede on the default ExecutionContext of the Play framework itself.
In case anything were to go wrong and the database operations were to hang on forever on the threads offered by that context, the rest of the application would not be affected (this principle is called “bulkheading” and is described a bit more in detail in Chapter 5 of Reactive Web Applications).
Conclusion
In this series we have explored the “Why?” of reactive applications and of asynchronous programming. In particular, we have talked about Futures and Actors, two powerful abstractions that make asynchronous programming easier to think about.
Most relational databases do not have asynchronous drivers available yet and even if there are some projects aiming at it I think it will still take some time before we’ll have a standard that will hopefully be implemented by many vendors. In the meanwhile we have seen that we can use a custom ExecutionContext in order to isolate otherwise blocking database operations.
If you liked this series and are interested in learning more on the topic, consider checking out my book which provides an introductio to building reactive web applications on the JVM. Futures are covered in Chapter 5, Actors in Chapter 6 and Database Access in Chapter 7.
Read on
Read the previous chapters of this series:
- Part 1: Why Reactive, why “Async” & an introduction to Futures
- Part 2: Introduction to Actors
- Part 3: Using jOOQ with Scala, Futures and Actors
Reference: | Reactive Database Access – Part 3 – Using jOOQ with Scala, Futures and Actors from our JCG partner Lukas Eder at the JAVA, SQL, AND JOOQ blog. |