Enterprise Java

How error handling works in Spring Integration

1.Introduction

The target of this post is to show you how errors are handled when using the messaging system with Spring Integration. You will see that error handling is different between synchronous and asynchronous messaging. As usual, I’ll skip the chat and proceed with some examples.

  • You can get the source code at github.

2.The sample application

I will use a basic example, since I want to focus on exception handling. The application consists in an order service, which receives an order, processes it and returns a confirmation.

Below we can see how the messaging system is configured:

int-config.xml

<context:component-scan base-package="xpadro.spring.integration"/>

<int:gateway default-request-channel="requestChannel" service-interface="xpadro.spring.integration.service.OrderService"/>

<int:channel id="requestChannel"/>

<int:router input-channel="requestChannel" ref="orderRouter" method="redirectOrder"/>

<int:channel id="syncChannel"/>

<int:channel id="asyncChannel">
    <int:queue capacity="5"/>
</int:channel>

<int:service-activator method="processOrder" input-channel="syncChannel" ref="orderProcessor"/>

<int:service-activator method="processOrder" input-channel="asyncChannel" ref="orderProcessor">
    <int:poller fixed-delay="2000"/>
</int:service-activator>

The gateway is the entry point of the messaging system. It will receive the order and send it to the direct channel “requestChannel” where a router will redirect it to the appropriate channel based on the order id:

  • syncChannel: A direct channel that will send the order to an order processor subscribed to this channel.
  • asyncChannel: A queue channel from which the order processor will actively retrieve the order.

Once the order is processed, an order confirmation will be sent back to the gateway. Here is a graphic representing this:

grafic

Ok, let’s start with the simplest case, synchronous sending using a Direct Channel.

3.Synchronous sending with Direct channel

The order processor is subscribed to the “syncChannel” Direct Channel. The “processOrder” method will be invoked in the sender’s thread.

public OrderConfirmation processOrder(Order order) {
    logger.info("Processing order {}", order.getId());
    
    if (isInvalidOrder(order)) {
        logger.info("Error while processing order [{}]", ERROR_INVALID_ID);
        throw new InvalidOrderException(ERROR_INVALID_ID);
    }
    
    return new OrderConfirmation("confirmed");
}

Now, we will implement a test that will provoke an exception by sending an invalid order. This test will send an order to the gateway:

public interface OrderService {
    @Gateway
    public OrderConfirmation sendOrder(Order order);
}

The test:

TestSyncErrorHandling.java

@ContextConfiguration(locations = {"/xpadro/spring/integration/config/int-config.xml"})
@RunWith(SpringJUnit4ClassRunner.class)
public class TestSyncErrorHandling {
    
    @Autowired
    private OrderService service;
    
    @Test
    public void testCorrectOrder() {
        OrderConfirmation confirmation = service.sendOrder(new Order(3, "a correct order"));
        Assert.assertNotNull(confirmation);
        Assert.assertEquals("confirmed", confirmation.getId());
    }
    
    @Test
    public void testSyncErrorHandling() {
        OrderConfirmation confirmation = null;
        try {
            confirmation = service.sendOrder(new Order(1, "an invalid order"));
            Assert.fail("Should throw a MessageHandlingException");
        } catch (MessageHandlingException e) {
            Assert.assertEquals(InvalidOrderException.class, e.getCause().getClass());
            Assert.assertNull(confirmation);
        }
    }
}

We run the test and see how an exception is raised in the order processor and reaches the test. That’s fine; we wanted to validate that sending an invalid order raised an exception. This happened because the test sent the order and blocked waiting for the order to be processed in the same thread. But, what happens when we use an asynchronous channel? Let’s continue to the next section.

4.Asynchronous sending with Queue Channel

This section’s test sends an order that will be redirected by the router to the queue channel. The gateway is shown below:

public interface OrderService {
    @Gateway
    public Future<OrderConfirmation> sendFutureOrder(Order order);
}

Notice that this time the gateway is returning a Future. If we didn’t return this, the gateway would block the test thread. By returning Future, the gateway becomes asynchronous and doesn’t block the sender’s thread.

The test:

TestKoAsyncErrorHandling.java

@ContextConfiguration(locations = {"/xpadro/spring/integration/config/int-config.xml"})
@RunWith(SpringJUnit4ClassRunner.class)
public class TestKoAsyncErrorHandling {
    
    @Autowired
    private OrderService service;
    
    @Test(expected=MessageHandlingException.class)
    public void testAsyncErrorHandling() throws InterruptedException, ExecutionException {
        Future<OrderConfirmation> confirmation = service.sendFutureOrder(new Order(6, "another order"));
    }
}

Ok, so now we are going to launch the test and see the exception raising…

java.lang.AssertionError: Expected exception: org.springframework.integration.MessageHandlingException

Oops the test failed because no exception reached the test! What happened? Well, the explanation is below:

<int:channel id="asyncChannel">
    <int:queue capacity="5"/>
</int:channel>

<int:service-activator method="processOrder" input-channel="asyncChannel" ref="orderProcessor">
    <int:poller fixed-delay="2000"/>
</int:service-activator>

Since we are using an asynchronous channel (a queue), the sender sends the order and moves on. Then, the receiver polls the order from a different thread. For this reason, it won’t be possible to throw the Exception back to the sender. Let’s act like nothing happened then? Well you better not, there are other options.

5.Asynchronous error handling

When using asynchronous messaging, Spring Integration handles exceptions by publishing them to message channels. The exception thrown will be wrapped into a MessagingException and become the payload of the message.

What channel is the error message sent to? First, it will check if the request message contains a header called “errorChannel”. If found, the error message will be sent there. Otherwise, the message will be sent to a so-called global error channel.

5.1 Global error channel

By default, a global error channel called “errorChannel” is created by Spring Integration. This channel is a publish-subscribe channel. This means we can subscribe several endpoints to this channel. In fact, there’s already an endpoint subscribed to it: a logging handler.This handler will log the payload of messages arriving to the channel, though it can be configured to behave differently.

We will now subscribe a new handler to this global channel and test that it receives the exception message by storing it into a database.

First of all, we will need to change a few things in our configuration. I’ve created a new file so it doesn’t interfere with our previous tests:

int-async-config.xml

<context:component-scan base-package="xpadro.spring.integration"/>

<int:gateway default-request-channel="asyncChannel" service-interface="xpadro.spring.integration.service.OrderService" 
    error-channel="errorChannel"/>

<int:channel id="asyncChannel">
    <int:queue capacity="5"/>
</int:channel>

<int:service-activator method="processOrder" input-channel="asyncChannel" ref="orderProcessor">
    <int:poller fixed-delay="2000"/>
</int:service-activator>

<int:service-activator input-channel="errorChannel" ref="orderErrorHandler" method="handleFailedOrder"/>

<bean id="orderErrorHandler" class="xpadro.spring.integration.activator.OrderErrorHandler"/>

The gateway: I’ve added an error channel. If the invocation fails, the error message will be sent to this channel. If I hadn’t defined an error channel, the gateway would have propagated the exception to the caller, but in this case it wouldn’t have worked since this is an asynchronous gateway.

The error handler: I’ve defined a new endpoint that is subscribed to the global error channel. Now, any error message sent to the global error channel will be delivered to our handler.

I’ve also added a configuration file in order to configure the database. Our error handler will insert received errors to this database:

db-config.xml

<bean id="jdbcTemplate" class="org.springframework.jdbc.core.JdbcTemplate">
    <constructor-arg ref="dataSource"/>
</bean>

<!-- in-memory database -->
<jdbc:embedded-database id="dataSource">
    <jdbc:script location="classpath:db/schemas/schema.sql" />
</jdbc:embedded-database>

The error handler is pretty simple; it receives the error message and inserts its information to the database:

public class OrderErrorHandler {
    @Autowired
    private JdbcTemplate jdbcTemplate;
    
    @ServiceActivator
    public void handleFailedOrder(Message<MessageHandlingException> message) {
        Order requestedOrder = (Order) message.getPayload().getFailedMessage().getPayload();
        saveToBD(requestedOrder.getId(), message.getPayload().getMessage());
    }
    
    private void saveToBD(int orderId, String errorMessage) {
        String query = "insert into errors(orderid, message) values (?,?)";
        jdbcTemplate.update(query, orderId, errorMessage);
    }
}

Ok, now is all set. Let’s implement a new test:

TestOkAsyncErrorHandlingTest.java

@ContextConfiguration(locations = {"/xpadro/spring/integration/config/int-async-config.xml",
    "/xpadro/spring/integration/config/db-config.xml"})
@RunWith(SpringJUnit4ClassRunner.class)
public class TestOkAsyncErrorHandling {
    @Autowired
    private JdbcTemplate jdbcTemplate;
    
    @Autowired
    private OrderService service;
    
    @Before
    public void prepareTest() {
        jdbcTemplate.update("delete from errors");
    }
    
    @Test
    public void testCorrectOrder() throws InterruptedException, ExecutionException {
        Future<OrderConfirmation> confirmation = service.sendFutureOrder(new Order(7, "another correct order"));
        OrderConfirmation orderConfirmation = confirmation.get();
        Assert.assertNotNull(orderConfirmation);
        Assert.assertEquals("confirmed", orderConfirmation.getId());
    }
    
    @Test
    public void testAsyncErrorHandling() throws InterruptedException, ExecutionException {
        Future<OrderConfirmation> confirmation = service.sendFutureOrder(new Order(6, "another order"));
        Thread.sleep(2000);
        Assert.assertEquals(1, getSavedErrors());
        validateSavedError(6);
    }
    
    private int getSavedErrors() {
        return jdbcTemplate.queryForObject("select count(*) from errors", Integer.class);
    }
    
    private void validateSavedError(int orderId) {
        String query = "select * from errors where orderid=?";
        Map<String, Object> result = jdbcTemplate.queryForMap(query, orderId);
        Assert.assertEquals(6, result.get("orderid"));
        assertThat((String)result.get("message"), containsString("Order ID is invalid"));
    }
}

This time the test is successful, the error message has been stored to the database.

5.2 Other mechanisms

Custom error channel: You can define your error channel and define it as a queue channel instead of the default publish-subscribe channel:

<int:poller id="defaultPoller" default="true" fixed-delay="5000" />

<int:channel id="errorChannel">
    <int:queue capacity="10"/>
</int:channel>

ErrorMessageExceptionTypeRouter: This Spring Integration specialized router will resolve the channel where the error message will be sent. It bases its decision on the most specific cause of the error:

<int:exception-type-router input-channel="errorChannel" default-output-channel="genericErrorChannel">
    <int:mapping exception-type="xpadro.spring.integration.exception.InvalidOrderException" channel="invalidChannel" />
    <int:mapping exception-type="xpadro.spring.integration.exception.FooException" channel="fooChannel" />
</int:exception-type-router>

6.Conclusion

We have learnt what are the different mechanisms for error handling when using Spring Integration. With this base, you will be able to extend it and configure your error handling by implementing transformers to extract the information from the error message, using header enrichers for setting error channel or implementing your own router among other things.
 

Xavier Padro

Xavier is a software developer working in a consulting firm based in Barcelona. He is specialized in web application development with experience in both frontend and backend. He is interested in everything related to Java and the Spring framework.
Subscribe
Notify of
guest

This site uses Akismet to reduce spam. Learn how your comment data is processed.

0 Comments
Oldest
Newest Most Voted
Inline Feedbacks
View all comments
Back to top button