How to create a pub/sub application with MongoDB ? Introduction
In this article we will see how to create a pub/sub application (messaging, chat, notification), and this fully based on MongoDB (without any message broker like RabbitMQ, JMS, … ).
So, what needs to be done to achieve such thing:
- an application “publish” a message. In our case, we simply save a document into MongoDB
- another application, or thread, subscribe to these events and will received message automatically. In our case this means that the application should automatically receive newly created document out of MongoDB
All this is possible with some very cool MongoDB features : capped collections and tailable cursors.
Capped Collections and Tailable Cursors
As you can see in the documentation, Capped Collections are fixed sized collections, that work in a way similar to circular buffers: once a collection fills its allocated space, it makes room for new documents by overwriting the oldest documents.
MongoDB Capped Collections can be queried using Tailable Cursors, that are similar to the unix tail -f command. Your application continue to retrieve documents as they are inserted into the collection. I also like to call this a “continuous query”. Now that we have seen the basics, let’s implement it.
Building a very basic application
Create the collection
The first thing to do is to create a new capped collection :
$> mongo use chat db.messages.drop() db.createCollection('messages', { capped: true, size: 10000 }) db.messages.insert({"type":"init"});
For simplicity, I am using the MongoDB Shell to create the messages collection in the chat database.
You can see on line #7 how to create a capped collection, with 2 options:
- capped : true : this one is obvious
- size : 10000 : this is a mandatory option when you create a capped collection. This is the maximum size in bytes. (will be raised to a multiple of 256)
Finally, on line #9, I insert a dummy document, this is also mandatory to be able to get the tailable cursor to work.
Write an application
Now that we have the collection, let’s write some code. First in node.js:
var mongo = require("mongodb"); var mongodbUri = "mongodb://127.0.0.1/chat"; mongo.MongoClient.connect (mongodbUri, function (err, db) { db.collection('messages', function(err, collection) { // open a tailable cursor console.log("== open tailable cursor"); collection.find({}, {tailable:true, awaitdata:true, numberOfRetries:-1}) .sort({ $natural: 1 }) .each(function(err, doc) { console.log(doc); }) }); });
From lines #1 to 5 I just connect to my local MongoDB instance.
Then on line #7, I get the messages collection.
And on line #10, I execute a find, using a tailable cursor, using specific options:
- {} : no filter, so all documents will be returned
- tailable : true : this one is clear, to say that we want to create a tailable cursor
- awaitdata : true : to say that we wait for data before returning no data to the client
- numberOfRetries : -1 : The number of times to retry on time out, -1 is infinite, so the application will keep trying
Line #11 just force the sort to the natural order, then on line #12, the cursor returns the data, and the document is printed in the console each time it is inserted.
Test the Application
Start the application:
node app.js
Insert documents in the messages collection, from the shell or any other tool. You can find below a screencast showing this very basic application working:
https://www.youtube.com/watch?v=uSuiYvssKuo
The source code of this sample application in this Github repository, take the step-01 branch; clone this branch using:
git clone -b step-01 https://github.com/tgrall/mongodb-realtime-pubsub.git
I have also created a gist showing the same behavior in Java:
package org.mongodb.demos.tailable; import com.mongodb.*; public class MyApp { public static void main(String[] args) throws Exception { MongoClient mongoClient = new MongoClient(); DBCollection coll = mongoClient.getDB("chat").getCollection("messages"); DBCursor cur = coll.find().sort(BasicDBObjectBuilder.start("$natural", 1).get()) .addOption(Bytes.QUERYOPTION_TAILABLE | Bytes.QUERYOPTION_AWAITDATA); System.out.println("== open cursor =="); Runnable task = () -> { System.out.println("\tWaiting for events"); while (cur.hasNext()) { DBObject obj = cur.next(); System.out.println( obj ); } }; new Thread(task).start(); } }
Mathieu Ancelin has written it in Scala:
package org.mongodb.demos.tailable import reactivemongo.api._ import reactivemongo.bson._ import play.api.libs.iteratee.Iteratee import scala.concurrent.ExecutionContext.Implicits.global import reactivemongo.api.collections.default.BSONCollection object Capped extends App { val driver = new MongoDriver val connection = driver.connection(List("localhost")) val db = connection("chat") val collection = db.collection[BSONCollection]("messages") val cursor = collection .find(BSONDocument()) .options(QueryOpts().tailable.awaitData) .cursor[BSONDocument] println("== open tailable cursor") cursor.enumerate().apply(Iteratee.foreach { doc => println(s"Document inserted: ${BSONDocument.pretty(doc)}") }) }
Add some user interface
We have the basics of a publish subscribe based application:
- publish by inserting document into MongoDB
- subscribe by reading document using a tailable cursor
Let’s now push the messages to a user using for example socket.io. For this we need to:
- add socket.io dependency to our node project
- add HTML page to show messages
The following gists shows the updated version of the app.js and index.html, let’s take a look:
"use strict"; var mongo = require("mongodb"), fs = require("fs"), // to read static files io = require("socket.io"), // socket io server http = require("http"); var mongodbUri = "mongodb://127.0.0.1/chat"; var app = http.createServer(handler); io = io.listen(app); app.listen(3000); console.log("http server on port 3000"); function handler(req, res){ fs.readFile(__dirname + "/index.html", function (err, data) { res.writeHead(200); res.end(data); }); } mongo.MongoClient.connect (mongodbUri, function (err, db) { db.collection('messages', function(err, collection) { // open socket io.sockets.on("connection", function (socket) { // open a tailable cursor console.log("== open tailable cursor"); collection.find({}, {tailable:true, awaitdata:true, numberOfRetries:-1}).sort({ $natural: 1 }).each(function(err, doc) { console.log(doc); // send message to client if (doc.type == "message") { socket.emit("message",doc); } }) }); }); });
The node application has been updated with the following features:
- lines #4-7: import of http, file system and socket.io
- lines #10-21: configure and start the http server. You can see that I have created a simple handler to serve static html file
- lines #28-39: I have added support to Web socket using socket.io where I open the tailable cursor, and push/emit the messages on the socket.
As you can see, the code that I have added is simple. I do not use any advanced framework, nor manage exceptions, this for simplicity and readability.
Let’s now look at the client (html page).
<!doctype html> <html> <head> <title>MongoDB pub/sub</title> <style> * { margin: 0; padding: 10px; box-sizing: border-box; } body { font: 13px Helvetica, Arial; } #messages { list-style-type: none; margin: 0; padding: 0; } #messages li { padding: 5px 10px; } #messages li:nth-child(odd) { background: #eee; } </style> </head> <body> <h2>MongoDB/Socket.io demonstration</h2> <ul id="messages"></ul> <script src="https://cdn.socket.io/socket.io-1.2.0.js"></script> <script src="https://www.javacodegeeks.com/wp-content/litespeed/localres/aHR0cHM6Ly9jb2RlLmpxdWVyeS5jb20vjquery-2.1.3.min.js"></script> <script> var socket = io(); socket.on('message', function(doc){ $('#messages').append($('<li>').text(doc.text)); }); </script> </body> </html>
Same as the server, it is really simple and does not use any advanced libraries except socket.io client (line #18) and JQuery (line #19), and used:
- on line #22 to received messages ans print them in the page using JQuery on line #23
I have created a screencast of this version of the application:
You can find the source code in this Github repository, take the step-02 branch; clone this branch using:
git clone -b step-02 https://github.com/tgrall/mongodb-realtime-pubsub.git
Conclusion
In this first post, we have:
- learned about tailable cursor and capped collection
- see how it can be used to develop a pub/sub application
- expose this into a basic web socket based application
Reference: | How to create a pub/sub application with MongoDB ? Introduction from our JCG partner Tugdual Grall at the Tug’s Blog blog. |