Listen to notifications from Postgresql with Scala
In the past I’ve written a couple of articles (Building a REST service in Scala with Akka HTTP, Akka Streams and reactive mongo and ReactiveMongo with Akka, Scala and websockets) which used MongoDB to push updates directly from the database to a Scala application. This is a very nice feature if you just want to subscribe your application to a list of streaming events where it doesn’t really matter if you miss one when your application is down. While MongoDB is a great database, it isn’t a right fit for all purposes. Sometimes you want a relational database, with a well defined schema, or a database that can combine the SQL and noSQL worlds. Personally I’ve always really liked Postgresql. It’s one of the best relational databases, has great GIS support (which I really like a lot), and is getting more and more JSON/Schema-less support (which I need to dive into sometime). One of the features I didn’t know about in Postgresql was that it provides a kind of subscribe mechanism. I learned about that when reading the “Listening to generic JSON notifications from PostgreSQL in Go” article which shows how to use this from Go. In this article we’ll try to see what you need to do, to get something similar working in Scala (approach for Java is pretty much the same).
How does this work in Postgresql
It is actually very easy to listen to notifications in Postgresql. All you have to do is the following:
LISTEN virtual; NOTIFY virtual; Asynchronous notification "virtual" received from server process with PID 8448. NOTIFY virtual, 'This is the payload'; Asynchronous notification "virtual" with payload "This is the payload" received from server process with PID 8448.
The connection that wants to listen to events calls LISTEN with the name of the channel it wants to listen on. And the sending connection just runs NOTIFY with the name of the channel, and a possible payload.
Preparing the database
The cool thing from the article on Go I mentioned in the introduction is that it provides a stored procedure which automatically sends a notification whenever a table row is INSERTed, UPDATEd, or DELETEd. The following, taken from Listening to generic JSON notifications from PostgreSQL in Go create a stored procedure which sends notifications when called.
-- CREATE OR REPLACE FUNCTION notify_event() RETURNS TRIGGER AS $$ DECLARE data json; notification json; BEGIN -- Convert the old or new row to JSON, based on the kind of action. -- Action = DELETE? -> OLD row -- Action = INSERT or UPDATE? -> NEW row IF (TG_OP = 'DELETE') THEN data = row_to_json(OLD); ELSE data = row_to_json(NEW); END IF; -- Contruct the notification as a JSON string. notification = json_build_object( 'table',TG_TABLE_NAME, 'action', TG_OP, 'data', data); -- Execute pg_notify(channel, notification) PERFORM pg_notify('events',notification::text); -- Result is ignored since this is an AFTER trigger RETURN NULL; END; $$ LANGUAGE plpgsql; ---
The really cool thing about this stored procedure, is that the data is converted to JSON, so we can easily process it in our application. For this example I’ll use the same tables and data used in the Go article, so first create a table:
CREATE TABLE products ( id SERIAL, name TEXT, quantity FLOAT );
And create a trigger whenever something happens to the table.
CREATE TRIGGER products_notify_event AFTER INSERT OR UPDATE OR DELETE ON products FOR EACH ROW EXECUTE PROCEDURE notify_event();
At this point, whenever a row is inserted, updated or deleted on the products table, a notify event is created. We can simply test this by using the pgsql command line:
triggers=# LISTEN events; LISTEN triggers=# INSERT INTO products(name, quantity) VALUES ('Something', 99999); INSERT 0 1 Asynchronous notification "events" with payload "{"table" : "products", "action" : "INSERT", "data" : {"id":50,"name":"Something","quantity":99999}}" received from server process with PID 24131. triggers=#
As you can see, the INSERT resulted in an asynchronous event which contains the data. So, so far we’ve pretty much followed the steps also outlined in the Go article. Now lets look at how we can access the notifications from Scala.
Accessing notifications from Scala
First lets setup our project’s dependencies. As always we use SBT. The build.sbt for this project looks like this:
name := "postgresql-notifications" version := "1.0" scalaVersion := "2.11.7" libraryDependencies ++= Seq("org.postgresql" % "postgresql" % "9.4-1200-jdbc41", "org.scalikejdbc" %% "scalikejdbc" % "2.2.8", "com.typesafe.akka" %% "akka-actor" % "2.4-SNAPSHOT", "org.json4s" %% "json4s-native" % "3.2.10" ) resolvers += "Akka Snapshot Repository" at "http://repo.akka.io/snapshots/"
A quick summary of the depencies:
- scalikeJDBC: This project provides an easy to use wrapper around JDBC, so we don’t have to use the Java way of connection handling and stuff.
- akka: We use the Akka framework to managed the connection with the database. Since the JDBC driver isn’t asynchronous ar can push data, we need to set an interval.
- json4s: This is just a simple Scala JSON library. We use this to quickly convert the incoming data into a simple case class.
We’ll first show you the complete source code for this example, and then explain the various parts:
import akka.actor.{Props, ActorSystem, Actor} import org.apache.commons.dbcp.{PoolingDataSource, DelegatingConnection} import org.json4s.DefaultFormats import org.postgresql.{PGNotification, PGConnection} import scalikejdbc._ import org.json4s.native.JsonMethods._ import scala.concurrent.duration._ /** * Simple case class to marshall to from received event. */ case class Product(id : Long, name: String, quantity: Long) /** * Main runner. Just setups the connection pool and the actor system */ object PostgresNotifications extends App { // initialize JDBC driver & connection pool Class.forName("org.postgresql.Driver") ConnectionPool.singleton("jdbc:postgresql://localhost:5432/triggers", "jos", "######") ConnectionPool.dataSource().asInstanceOf[PoolingDataSource].setAccessToUnderlyingConnectionAllowed(true) // initialize the actor system val system = ActorSystem("Hello") val a = system.actorOf(Props[Poller], "poller") // wait for the user to stop the server println("Press <enter> to exit.") Console.in.read.toChar system.terminate } class Poller extends Actor { // execution context for the ticks import context.dispatcher val connection = ConnectionPool.borrow() val db: DB = DB(connection) val tick = context.system.scheduler.schedule(500 millis, 1000 millis, self, "tick") override def preStart() = { // make sure connection isn't closed when executing queries // we setup the db.autoClose(false) db.localTx { implicit session => sql"LISTEN events".execute().apply() } } override def postStop() = { tick.cancel() db.close() } def receive = { case "tick" => { db.readOnly { implicit session => val pgConnection = connection.asInstanceOf[DelegatingConnection].getInnermostDelegate.asInstanceOf[PGConnection] val notifications = Option(pgConnection.getNotifications).getOrElse(Array[PGNotification]()) notifications.foreach( not => { println(s"Received for: ${not.getName} from process with PID: ${not.getPID}") println(s"Received data: ${not.getParameter} ") // convert to object implicit val formats = DefaultFormats val json = parse(not.getParameter) \\ "data" val prod = json.extract[Product] println(s"Received as object: $prod\n") } ) } } } }
If you’re familiar with Akka and with scalikeJDBC the code will look familiar. We start with some general setup stuff:
/** * Simple case class to marshall to from received event. */ case class Product(id : Long, name: String, quantity: Long) /** * Main runner. Just setups the connection pool and the actor system */ object PostgresNotifications extends App { // initialize JDBC driver & connection pool Class.forName("org.postgresql.Driver") ConnectionPool.singleton("jdbc:postgresql://localhost:5432/triggers", "jos", "######") ConnectionPool.dataSource().asInstanceOf[PoolingDataSource].setAccessToUnderlyingConnectionAllowed(true) // initialize the actor system val system = ActorSystem("Hello") val a = system.actorOf(Props[Poller], "poller") // wait for the user to stop the server println("Press <enter> to exit.") Console.in.read.toChar system.terminate }
Here we define our case class to which we’ll transform the incoming JSON, setup a connection pool, define the Akka-System and start our Poller actor. Nothing too special here, the only thing special is on line 23. To add a listener from Scala we need access to the underlying JDBC Connection. Since scalikeJDBC uses connection pooling, we need to explicitly call setAccessToUnderlyingConnectionAllowed to make sure we’re allowed to access the actual connection when we call getInnerMostDelegate, and not just wrapped one from the connection pool. Interesting to note here, is that if we don’t set this, we don’t get an error message or anything, we just get a Null from this method call….
With this out of the way, and our Actor started, lets see what it does:
class Poller extends Actor { // execution context for the ticks import context.dispatcher val connection = ConnectionPool.borrow() val db: DB = DB(connection) val tick = context.system.scheduler.schedule(500 millis, 1000 millis, self, "tick") override def preStart() = { // make sure connection isn't closed when executing queries // we setup the db.autoClose(false) db.localTx { implicit session => sql"LISTEN events".execute().apply() } } override def postStop() = { tick.cancel() db.close() } def receive = { case "tick" => { db.readOnly { implicit session => val pgConnection = connection.asInstanceOf[DelegatingConnection].getInnermostDelegate.asInstanceOf[PGConnection] val notifications = Option(pgConnection.getNotifications).getOrElse(Array[PGNotification]()) notifications.foreach( not => { println(s"Received for: ${not.getName} from process with PID: ${not.getPID}") println(s"Received data: ${not.getParameter} ") // convert to object implicit val formats = DefaultFormats val json = parse(not.getParameter) \\ "data" val prod = json.extract[Product] println(s"Received as object: $prod\n") } ) } } } }
The first thing we do in our actor is set some properties needed by scalikeJDBC, and setup an timer which fires a message each 500 ms. Also note the preStart and postStop functions. In the preStart we execute a small piece of SQL, which tells postgres that this connection will be listening to notifications with the name “events”. We also set DB.autoClose to falls, to avoid the session pooling mechanism closing the session and connection. We want to keep these alive, so we can receive events. When the actor is terminated we make sure to clean up the timer and connection.
In the receive function we first get the real PGConnection and then get the notifications from the connection:
val pgConnection = connection.asInstanceOf[DelegatingConnection].getInnermostDelegate.asInstanceOf[PGConnection] val notifications = Option(pgConnection.getNotifications).getOrElse(Array[PGNotification]())
If there a no notification Null will be returned, so we wrap this in an Option, and just return an empty array in the case of Null. If there are any notification we just process them in a foreach loop and print out the result:
implicit val formats = DefaultFormats val json = parse(not.getParameter) \\ "data" val prod = json.extract[Product] println(s"Received as object: $prod\n")
Here you can also see that we just get the “data” element from the notification, and convert it to our Product class for further processing. All you have to do now is start the application and from the same pgsql terminal add some events. If all went well, you’ll see output similar to this in your console:
Received for: events from process with PID: 24131 Received data: {"table" : "products", "action" : "INSERT", "data" : {"id":47,"name":"pen","quantity":10200}} Received as object: Product(47,pen,10200) Received for: events from process with PID: 24131 Received data: {"table" : "products", "action" : "INSERT", "data" : {"id":48,"name":"pen","quantity":10200}} Received as object: Product(48,pen,10200) Received for: events from process with PID: 24131 Received data: {"table" : "products", "action" : "INSERT", "data" : {"id":49,"name":"pen","quantity":10200}} Received as object: Product(49,pen,10200) Received for: events from process with PID: 24131 Received data: {"table" : "products", "action" : "INSERT", "data" : {"id":50,"name":"Something","quantity":99999}} Received as object: Product(50,Something,99999)
Now that you’ve got this basic construct working it’s trivial to use this, for instance, as a source for reactive streams, or just use websockets to further propagate these events.
Reference: | Listen to notifications from Postgresql with Scala from our JCG partner Jos Dirksen at the Smart Java blog. |
Bah. This would probably be an excellent chance to use the RULE system in Postgres instead of a trigger.