How error handling works in Spring Integration
1.Introduction
The target of this post is to show you how errors are handled when using the messaging system with Spring Integration. You will see that error handling is different between synchronous and asynchronous messaging. As usual, I’ll skip the chat and proceed with some examples.
- You can get the source code at github.
2.The sample application
I will use a basic example, since I want to focus on exception handling. The application consists in an order service, which receives an order, processes it and returns a confirmation.
Below we can see how the messaging system is configured:
int-config.xml
<context:component-scan base-package="xpadro.spring.integration"/> <int:gateway default-request-channel="requestChannel" service-interface="xpadro.spring.integration.service.OrderService"/> <int:channel id="requestChannel"/> <int:router input-channel="requestChannel" ref="orderRouter" method="redirectOrder"/> <int:channel id="syncChannel"/> <int:channel id="asyncChannel"> <int:queue capacity="5"/> </int:channel> <int:service-activator method="processOrder" input-channel="syncChannel" ref="orderProcessor"/> <int:service-activator method="processOrder" input-channel="asyncChannel" ref="orderProcessor"> <int:poller fixed-delay="2000"/> </int:service-activator>
The gateway is the entry point of the messaging system. It will receive the order and send it to the direct channel “requestChannel” where a router will redirect it to the appropriate channel based on the order id:
- syncChannel: A direct channel that will send the order to an order processor subscribed to this channel.
- asyncChannel: A queue channel from which the order processor will actively retrieve the order.
Once the order is processed, an order confirmation will be sent back to the gateway. Here is a graphic representing this:
Ok, let’s start with the simplest case, synchronous sending using a Direct Channel.
3.Synchronous sending with Direct channel
The order processor is subscribed to the “syncChannel” Direct Channel. The “processOrder” method will be invoked in the sender’s thread.
public OrderConfirmation processOrder(Order order) { logger.info("Processing order {}", order.getId()); if (isInvalidOrder(order)) { logger.info("Error while processing order [{}]", ERROR_INVALID_ID); throw new InvalidOrderException(ERROR_INVALID_ID); } return new OrderConfirmation("confirmed"); }
Now, we will implement a test that will provoke an exception by sending an invalid order. This test will send an order to the gateway:
public interface OrderService { @Gateway public OrderConfirmation sendOrder(Order order); }
The test:
TestSyncErrorHandling.java
@ContextConfiguration(locations = {"/xpadro/spring/integration/config/int-config.xml"}) @RunWith(SpringJUnit4ClassRunner.class) public class TestSyncErrorHandling { @Autowired private OrderService service; @Test public void testCorrectOrder() { OrderConfirmation confirmation = service.sendOrder(new Order(3, "a correct order")); Assert.assertNotNull(confirmation); Assert.assertEquals("confirmed", confirmation.getId()); } @Test public void testSyncErrorHandling() { OrderConfirmation confirmation = null; try { confirmation = service.sendOrder(new Order(1, "an invalid order")); Assert.fail("Should throw a MessageHandlingException"); } catch (MessageHandlingException e) { Assert.assertEquals(InvalidOrderException.class, e.getCause().getClass()); Assert.assertNull(confirmation); } } }
We run the test and see how an exception is raised in the order processor and reaches the test. That’s fine; we wanted to validate that sending an invalid order raised an exception. This happened because the test sent the order and blocked waiting for the order to be processed in the same thread. But, what happens when we use an asynchronous channel? Let’s continue to the next section.
4.Asynchronous sending with Queue Channel
This section’s test sends an order that will be redirected by the router to the queue channel. The gateway is shown below:
public interface OrderService { @Gateway public Future<OrderConfirmation> sendFutureOrder(Order order); }
Notice that this time the gateway is returning a Future. If we didn’t return this, the gateway would block the test thread. By returning Future, the gateway becomes asynchronous and doesn’t block the sender’s thread.
The test:
TestKoAsyncErrorHandling.java
@ContextConfiguration(locations = {"/xpadro/spring/integration/config/int-config.xml"}) @RunWith(SpringJUnit4ClassRunner.class) public class TestKoAsyncErrorHandling { @Autowired private OrderService service; @Test(expected=MessageHandlingException.class) public void testAsyncErrorHandling() throws InterruptedException, ExecutionException { Future<OrderConfirmation> confirmation = service.sendFutureOrder(new Order(6, "another order")); } }
Ok, so now we are going to launch the test and see the exception raising…
java.lang.AssertionError: Expected exception: org.springframework.integration.MessageHandlingException
Oops the test failed because no exception reached the test! What happened? Well, the explanation is below:
<int:channel id="asyncChannel"> <int:queue capacity="5"/> </int:channel> <int:service-activator method="processOrder" input-channel="asyncChannel" ref="orderProcessor"> <int:poller fixed-delay="2000"/> </int:service-activator>
Since we are using an asynchronous channel (a queue), the sender sends the order and moves on. Then, the receiver polls the order from a different thread. For this reason, it won’t be possible to throw the Exception back to the sender. Let’s act like nothing happened then? Well you better not, there are other options.
5.Asynchronous error handling
When using asynchronous messaging, Spring Integration handles exceptions by publishing them to message channels. The exception thrown will be wrapped into a MessagingException and become the payload of the message.
What channel is the error message sent to? First, it will check if the request message contains a header called “errorChannel”. If found, the error message will be sent there. Otherwise, the message will be sent to a so-called global error channel.
5.1 Global error channel
By default, a global error channel called “errorChannel” is created by Spring Integration. This channel is a publish-subscribe channel. This means we can subscribe several endpoints to this channel. In fact, there’s already an endpoint subscribed to it: a logging handler.This handler will log the payload of messages arriving to the channel, though it can be configured to behave differently.
We will now subscribe a new handler to this global channel and test that it receives the exception message by storing it into a database.
First of all, we will need to change a few things in our configuration. I’ve created a new file so it doesn’t interfere with our previous tests:
int-async-config.xml
<context:component-scan base-package="xpadro.spring.integration"/> <int:gateway default-request-channel="asyncChannel" service-interface="xpadro.spring.integration.service.OrderService" error-channel="errorChannel"/> <int:channel id="asyncChannel"> <int:queue capacity="5"/> </int:channel> <int:service-activator method="processOrder" input-channel="asyncChannel" ref="orderProcessor"> <int:poller fixed-delay="2000"/> </int:service-activator> <int:service-activator input-channel="errorChannel" ref="orderErrorHandler" method="handleFailedOrder"/> <bean id="orderErrorHandler" class="xpadro.spring.integration.activator.OrderErrorHandler"/>
The gateway: I’ve added an error channel. If the invocation fails, the error message will be sent to this channel. If I hadn’t defined an error channel, the gateway would have propagated the exception to the caller, but in this case it wouldn’t have worked since this is an asynchronous gateway.
The error handler: I’ve defined a new endpoint that is subscribed to the global error channel. Now, any error message sent to the global error channel will be delivered to our handler.
I’ve also added a configuration file in order to configure the database. Our error handler will insert received errors to this database:
db-config.xml
<bean id="jdbcTemplate" class="org.springframework.jdbc.core.JdbcTemplate"> <constructor-arg ref="dataSource"/> </bean> <!-- in-memory database --> <jdbc:embedded-database id="dataSource"> <jdbc:script location="classpath:db/schemas/schema.sql" /> </jdbc:embedded-database>
The error handler is pretty simple; it receives the error message and inserts its information to the database:
public class OrderErrorHandler { @Autowired private JdbcTemplate jdbcTemplate; @ServiceActivator public void handleFailedOrder(Message<MessageHandlingException> message) { Order requestedOrder = (Order) message.getPayload().getFailedMessage().getPayload(); saveToBD(requestedOrder.getId(), message.getPayload().getMessage()); } private void saveToBD(int orderId, String errorMessage) { String query = "insert into errors(orderid, message) values (?,?)"; jdbcTemplate.update(query, orderId, errorMessage); } }
Ok, now is all set. Let’s implement a new test:
TestOkAsyncErrorHandlingTest.java
@ContextConfiguration(locations = {"/xpadro/spring/integration/config/int-async-config.xml", "/xpadro/spring/integration/config/db-config.xml"}) @RunWith(SpringJUnit4ClassRunner.class) public class TestOkAsyncErrorHandling { @Autowired private JdbcTemplate jdbcTemplate; @Autowired private OrderService service; @Before public void prepareTest() { jdbcTemplate.update("delete from errors"); } @Test public void testCorrectOrder() throws InterruptedException, ExecutionException { Future<OrderConfirmation> confirmation = service.sendFutureOrder(new Order(7, "another correct order")); OrderConfirmation orderConfirmation = confirmation.get(); Assert.assertNotNull(orderConfirmation); Assert.assertEquals("confirmed", orderConfirmation.getId()); } @Test public void testAsyncErrorHandling() throws InterruptedException, ExecutionException { Future<OrderConfirmation> confirmation = service.sendFutureOrder(new Order(6, "another order")); Thread.sleep(2000); Assert.assertEquals(1, getSavedErrors()); validateSavedError(6); } private int getSavedErrors() { return jdbcTemplate.queryForObject("select count(*) from errors", Integer.class); } private void validateSavedError(int orderId) { String query = "select * from errors where orderid=?"; Map<String, Object> result = jdbcTemplate.queryForMap(query, orderId); Assert.assertEquals(6, result.get("orderid")); assertThat((String)result.get("message"), containsString("Order ID is invalid")); } }
This time the test is successful, the error message has been stored to the database.
5.2 Other mechanisms
Custom error channel: You can define your error channel and define it as a queue channel instead of the default publish-subscribe channel:
<int:poller id="defaultPoller" default="true" fixed-delay="5000" /> <int:channel id="errorChannel"> <int:queue capacity="10"/> </int:channel>
ErrorMessageExceptionTypeRouter: This Spring Integration specialized router will resolve the channel where the error message will be sent. It bases its decision on the most specific cause of the error:
<int:exception-type-router input-channel="errorChannel" default-output-channel="genericErrorChannel"> <int:mapping exception-type="xpadro.spring.integration.exception.InvalidOrderException" channel="invalidChannel" /> <int:mapping exception-type="xpadro.spring.integration.exception.FooException" channel="fooChannel" /> </int:exception-type-router>
6.Conclusion
We have learnt what are the different mechanisms for error handling when using Spring Integration. With this base, you will be able to extend it and configure your error handling by implementing transformers to extract the information from the error message, using header enrichers for setting error channel or implementing your own router among other things.