RabbitMQ retries using Spring Integration
I recently read about an approach to retry with RabbitMQ
here and wanted to try a similar approach with
Spring Integration, which provides an awesome set of integration abstractions.
TL;DR the problem being solved is to retry a message(in case of failures in processing) a few times with a large delay between retries(say 10 mins +). The approach makes use of the RabbitMQ support for
Dead Letter Exchanges and looks something like this
The gist of the flow is :
1. A Work dispatcher creates “Work Unit”(s) and sends it to a RabbitMQ queue via an exchange.
2. The Work queue is set with a
Dead Letter exchange. If the message processing fails for any reason the “Work Unit” ends up with the Work Unit Dead Letter Queue.
3. Work Unit Dead Letter queue is in-turn set with the Work Unit exchange as the Dead Letter Exchange, this way creating a cycle. Further, the expiration of messages in the dead letter queue is set to say 10 mins, this way once the message expires it will be back again in the Work unit queue.
4. To break the cycle the processing code has to stop processing once a certain count threshold is exceeded.
Implementation using Spring Integration
I have covered a straight happy path flow using Spring Integration and RabbitMQ
before, here I will mostly be building on top of this code.
A good part of the set-up is the configuration of the appropriate dead letter exchanges/queues, and looks like this when expressed using Spring’s Java Configuration:
@Configuration public class RabbitConfig { @Autowired private ConnectionFactory rabbitConnectionFactory; @Bean Exchange worksExchange() { return ExchangeBuilder.topicExchange("work.exchange") .durable() .build(); } @Bean public Queue worksQueue() { return QueueBuilder.durable("work.queue") .withArgument("x-dead-letter-exchange", worksDlExchange().getName()) .build(); } @Bean Binding worksBinding() { return BindingBuilder .bind(worksQueue()) .to(worksExchange()).with("#").noargs(); } // Dead letter exchange for holding rejected work units.. @Bean Exchange worksDlExchange() { return ExchangeBuilder .topicExchange("work.exchange.dl") .durable() .build(); } //Queue to hold Deadletter messages from worksQueue @Bean public Queue worksDLQueue() { return QueueBuilder .durable("works.queue.dl") .withArgument("x-message-ttl", 20000) .withArgument("x-dead-letter-exchange", worksExchange().getName()) .build(); } @Bean Binding worksDlBinding() { return BindingBuilder .bind(worksDLQueue()) .to(worksDlExchange()).with("#") .noargs(); } ... }
Note that here I have set the TTL of the Dead Letter queue to 20 seconds, this means that after 20 seconds a failed message will be back in the processing queue. Once this set-up is in place and the appropriate structures are created in RabbitMQ, the consuming part of the code looks like this, expressed using
Spring Integration Java DSL:
@Configuration public class WorkInbound { @Autowired private RabbitConfig rabbitConfig; @Bean public IntegrationFlow inboundFlow() { return IntegrationFlows.from( Amqp.inboundAdapter(rabbitConfig.workListenerContainer())) .transform(Transformers.fromJson(WorkUnit.class)) .log() .filter("(headers['x-death'] != null) ? headers['x-death'][0].count <= 3: true", f -> f.discardChannel("nullChannel")) .handle("workHandler", "process") .get(); } }
Most of the retry logic here is handled by the RabbitMQ infrastructure, the only change here is to break the cycle by explicitly discarding the message after a certain 2 retries. This break is expressed as a filter above, looking at the header called “x-death” that RabbitMQ adds to the message once it is sent to Dead Letter exchange. The filter is admittedly a little ugly – it can likely be expressed a little better in Java code.
One more thing to note is that the retry logic could have been expressed in-process using Spring Integration, however I wanted to investigate a flow where the retry times can be high (say 15 to 20 mins) which will not work well in-process and is also not cluster safe as I want any instances of an application to potentially handle the retry of a message.
If you want to explore further, do try the sample at
my github repo – https://github.com/bijukunjummen/si-dsl-rabbit-sample
Reference:
Retry With RabbitMQ: http://dev.venntro.com/2014/07/back-off-and-retry-with-rabbitmq
Reference: | RabbitMQ retries using Spring Integration from our JCG partner Biju Kunjummen at the all and sundry blog. |