RabbitMQ: Scheduled Message Delivery
The main idea behind this is utilizing a new feature available in 2.8.0, dead-letter exchanges. This AMQP extension allows you to specify an exchange on a queue that messages should be published to when a message either expires or is rejected with requeue set to false.
With this in mind, we can simply create a queue for messages we want to be delivered later with an x-message-ttl set to the duration we want to wait before it is delivered. And to ensure the message is transferred to another queue we simply define the x-dead-letter-exchange to an exchange we created (in this case I’ll call it immediate) and bind a queue to it (the “right.now.queue”).
In coffeescript with node-amqp this looks like this:
amqp = require 'amqp' conn = amqp.createConnection() key = "send.later.#{new Date().getTime()}" conn.on 'ready', ->' conn.queue key, { arguments:{ "x-dead-letter-exchange":"immediate" , "x-message-ttl": 5000 } }
Next I define the immediate exchange, bind a queue to it and subscribe.
conn.exchange 'immediate' conn.queue 'right.now.queue', {autoDelete: false, durable: true}, (q) -> q.bind('immediate', 'right.now.queue') q.subscribe (msg, headers, deliveryInfo) -> console.log msg console.log headers
Finally, after defining the queue I created earlier we want publish a message on it. So to revisit the earlier queue definition we add a publish call to publish directly to the queue (using the default exchange).
conn.on 'ready', -> conn.queue key, { arguments:{ "x-dead-letter-exchange":"immediate" , "x-message-ttl": 5000 } }, -> conn.publish key, {v:1}, {contentType:'application/json'}
The result of running this is we’ll see a 5 second wait and then the message content and headers get dumped to the console. Since the queue is only used temporarily in this scenario I also set the x-expires attribute of the queue to expire in a reasonable amount of time after the message expires. This makes sure we don’t wind up with a ton of unused queues just sitting around.
Here’s the result of this exercise in its entirety.
amqp = require 'amqp' events = require 'events' em = new events.EventEmitter() conn = amqp.createConnection() key = "send.later.#{new Date().getTime()}" conn.on 'ready', -> conn.queue key, { arguments:{ "x-dead-letter-exchange":"immediate" , "x-message-ttl": 5000 , "x-expires": 6000 } }, -> conn.publish key, {v:1}, {contentType:'application/json'} conn.exchange 'immediate' conn.queue 'right.now.queue', { autoDelete: false , durable: true }, (q) -> q.bind('immediate', 'right.now.queue') q.subscribe (msg, headers, deliveryInfo) -> console.log msg console.log headers
You can get this exercise in full on github.
This is pretty interesting and I plan to experiment further with utilizing this in one of my production node.js applications that use interval based polling to trigger scheduled events.
Reference: Scheduled Message Delivery with RabbitMQ from our JCG partner James Carr at the Rants and Musings of an Agile Developer blog.
yes you can.
The dead-lettering process adds an array to the header of each dead-lettered message named x-death. This array contains an entry for each time the message was dead-lettered. Each such entry is a table that consists of several fields, one of them is “reason” though which you can distinguish between expired & rejected:
So will this delayed queue replace the existing dead-letter-queue? If there is a system with 50 queues, and I want to setup delayed messaging on one of those queues, can I do that without having all the expired messages landing up into this new queue?