Mule ESB, ActiveMQ and the DLQ
In this post I will show a simple Mule ESB flow to see the DLQ feature of Active MQ in action.
I assume you have a running Apache ActiveMQ instance available (if not you can download a version here). In this example I make use of Mule ESB 3.4.2 and ActiveMQ 5.9.0. We can create a simple Mule project based on the following pom file:
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>net.pascalalma.demo</groupId> <artifactId>activemq-test-flow</artifactId> <packaging>mule</packaging> <name>${project.artifactId}</name> <version>1.0.0-SNAPSHOT</version> <properties> <mule.version>3.4.2</mule.version> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding> <jdk.version>1.7</jdk.version> <junit.version>4.9</junit.version> <activemq.version>5.9.0</activemq.version> </properties> <dependencies> <!-- Mule Dependencies --> <dependency> <groupId>org.mule</groupId> <artifactId>mule-core</artifactId> <version>${mule.version}</version> </dependency> <!-- Mule Transports --> <dependency> <groupId>org.mule.transports</groupId> <artifactId>mule-transport-jms</artifactId> <version>${mule.version}</version> </dependency> <dependency> <groupId>org.mule.transports</groupId> <artifactId>mule-transport-vm</artifactId> <version>${mule.version}</version> </dependency> <!-- Mule Modules --> <dependency> <groupId>org.mule.modules</groupId> <artifactId>mule-module-client</artifactId> <version>${mule.version}</version> </dependency> <dependency> <groupId>org.mule.modules</groupId> <artifactId>mule-module-scripting</artifactId> <version>${mule.version}</version> </dependency> <!-- for testing --> <dependency> <groupId>org.mule.tests</groupId> <artifactId>mule-tests-functional</artifactId> <version>${mule.version}</version> </dependency> <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <version>${junit.version}</version> </dependency> <dependency> <groupId>org.apache.activemq</groupId> <artifactId>activemq-client</artifactId> <version>${activemq.version}</version> </dependency> </dependencies> <build> <plugins> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-compiler-plugin</artifactId> <version>2.3.2</version> <configuration> <source>${jdk.version}</source> <target>${jdk.version}</target> <encoding>${project.build.sourceEncoding}</encoding> </configuration> </plugin> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-resources-plugin</artifactId> <version>2.5</version> <configuration> <encoding>${project.build.sourceEncoding}</encoding> </configuration> </plugin> <plugin> <groupId>org.mule.tools</groupId> <artifactId>maven-mule-plugin</artifactId> <version>1.9</version> <extensions>true</extensions> <configuration> <copyToAppsDirectory>false</copyToAppsDirectory> </configuration> </plugin> </plugins> </build> </project>
There is not much special here. Besides the necessary dependencies I have added the maven-mule-plugin so I can create a ‘mule’ packaging type and run Mule from my IDE.
With this Maven pom in place we can create the following two Mule configurations. One for the Mule flow to test our transaction:
<?xml version="1.0" encoding="UTF-8"?> <mule xmlns="http://www.mulesoft.org/schema/mule/core" xmlns:scripting="http://www.mulesoft.org/schema/mule/scripting" version="EE-3.4.1" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation=" http://www.mulesoft.org/schema/mule/core http://www.mulesoft.org/schema/mule/core/current/mule.xsd http://www.mulesoft.org/schema/mule/scripting http://www.mulesoft.org/schema/mule/scripting/current/mule-scripting.xsd"> <flow name="MainFlow"> <inbound-endpoint ref="event-queue" /> <logger category="net.pascalalma.demo.MainFlow" level="INFO" message="Received message from activeMQ" /> <scripting:component> <scripting:script engine="Groovy"> throw new Exception('Soap Fault Response detected') </scripting:script> </scripting:component> <outbound-endpoint ref="result-queue" /> </flow> </mule>
In this flow we receive a message from the inbound endpoint, log a message and throw an exception before the message is put on the next queue. As we can see I didn’t add any exception handler. The configuration of the endpoints and connectors look like this:
<?xml version="1.0" encoding="UTF-8"?> <mule xmlns="http://www.mulesoft.org/schema/mule/core" xmlns:jms="http://www.mulesoft.org/schema/mule/jms" xmlns:spring="http://www.springframework.org/schema/beans" version="EE-3.4.1" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation=" http://www.mulesoft.org/schema/mule/core http://www.mulesoft.org/schema/mule/core/current/mule.xsd http://www.mulesoft.org/schema/mule/jms http://www.mulesoft.org/schema/mule/jms/current/mule-jms.xsd"> <spring:bean id="redeliveryPolicy" class="org.apache.activemq.RedeliveryPolicy"> <spring:property name="maximumRedeliveries" value="5"/> <spring:property name="initialRedeliveryDelay" value="500"/> <spring:property name="maximumRedeliveryDelay" value="10000"/> <spring:property name="useExponentialBackOff" value="false"/> <spring:property name="backOffMultiplier" value="3"/> </spring:bean> <!-- ActiveMQ Connection factory --> <spring:bean id="amqFactory" class="org.apache.activemq.ActiveMQConnectionFactory" lazy-init="true"> <spring:property name="brokerURL" value="tcp://localhost:61616" /> <spring:property name="redeliveryPolicy" ref="redeliveryPolicy" /> </spring:bean> <jms:activemq-connector name="activeMqConnector" connectionFactory-ref="amqFactory" persistentDelivery="true" numberOfConcurrentTransactedReceivers="2" specification="1.1" /> <jms:endpoint name="event-queue" connector-ref="activeMqConnector" queue="event-queue" > <jms:transaction action="ALWAYS_BEGIN" /> </jms:endpoint> <jms:endpoint name="result-queue" connector-ref="activeMqConnector" queue="result-queue" > <jms:transaction action="ALWAYS_JOIN" /> </jms:endpoint> </mule>
I defined a Spring bean for an ActiveMQ connection factory and one for the redelivery policy of this factory. With this redelivery policy we can configure how often Mule should retry to process a message from the queue when the original attempt failed. A nice feature in the redelivery policy is the ‘backOffMultiplier’ and ‘useExponentialBackOff’ combination. With these options you can have the period between two redelivery attempts increase exponentially until ‘maximumRedeliveryDelay’ is reached. In that case Mule will wait the ‘maximumRedeliveryDelay’ for the next attempt.
So with these configurations we can create a Mule test class and run it. The test class would look something like this:
package net.pascalalma.demo; import org.junit.Test; import org.mule.DefaultMuleMessage; import org.mule.api.MuleMessage; import org.mule.module.client.MuleClient; import org.mule.tck.junit4.FunctionalTestCase; public class TransactionFlowTest extends FunctionalTestCase { @Override protected String getConfigResources() { return "app/test-flow.xml, app/test-endpoints.xml"; } @Test public void testError() throws Exception { MuleClient client = new MuleClient(muleContext); MuleMessage inMsg = new DefaultMuleMessage("<txt>Some message</txt>", muleContext); client.dispatch("event-queue", inMsg); // Give Mule the chance to redeliver the message Thread.sleep(4000); } }
If we run this test you will see messages in the logging like:
Exception stack is: 1. "Message with id "ID:Pascals-MacBook-Pro-2.local-59158-1406440948059-1:1:3:1:1" has been redelivered 3 times on endpoint "jms://event-queue", which exceeds the maxRedelivery setting of 0 on the connector "activeMqConnector". Message payload is of type: ActiveMQTextMessage (org.mule.transport.jms.redelivery.MessageRedeliveredException) org.mule.transport.jms.redelivery.JmsXRedeliveryHandler:87 (http://www.mulesoft.org/docs/site/current3/apidocs/org/mule/transport/jms/redelivery/MessageRedeliveredException.html)
If we now switch to the ActiveMQ console which can be reached at http://localhost:8161 for the default local installation we can see the following queues:
As expected we see two queues being created, the event-queue which is empty and the default ActiveMQ.DLQ which contains our message:
As you can image it might be handy to have a specific DLQ for each queue instead of one DLQ which will contain all kinds of undeliverable messages. Luckily this is easy to configure in ActiveMQ. Just put the following in the ‘activemq.xml’ file that can be found in ‘$ACTIVEMQ_HOME/conf’ folder.
<!-- Set the following policy on all queues using the '>' wildcard --> <policyEntry queue=">"> <deadLetterStrategy> <individualDeadLetterStrategy queuePrefix="DLQ." useQueueForQueueMessages="true" /> </deadLetterStrategy> </policyEntry>
If we now restart ActiveMQ, remove the existing queues and rerun our test we see the following result:
So with this setup each queue has its own DLQ. For more options regarding these ActieMQ settings see here. With the Mule flow created in this post it is easy to test and play with these settings.
Reference: | Mule ESB, ActiveMQ and the DLQ from our JCG partner Pascal Alma at the The Pragmatic Integrator blog. |