Spring Integration – Using RMI Channel Adapters
1.Introduction
This article explains how to send and receive messages over RMI using Spring Integration RMI channel adapters. It is composed of the following sections:
- Implementing the service: The first section focuses on creating and exposing a service.
- Implementing the client: Shows how to invoke the service using the MessagingTemplate class.
- Abstracting SI logic: Finally, I’ve added another section explaining how to implement the same client abstracting all Spring Integration code, leaving the client focused on its business logic.
You can get the source code at github.
2.Implementing the service
This first part is pretty simple. The service is defined through annotations, so it will be autodetected by component scanning. It has a repository injected, which gets the data from an embedded database, as it will be shown in this same section:
@Service("defaultEmployeeService") public class EmployeeServiceImpl implements EmployeeService { @Autowired private EmployeeRepository employeeRepository; @Override public Employee retrieveEmployee(int id) { return employeeRepository.getEmployee(id); } }
The repository is as follows:
@Repository public class EmployeeRepositoryImpl implements EmployeeRepository { private JdbcTemplate template; private RowMapper<Employee> rowMapper = new EmployeeRowMapper(); private static final String SEARCH = "select * from employees where id = ?"; private static final String COLUMN_ID = "id"; private static final String COLUMN_NAME = "name"; @Autowired public EmployeeRepositoryImpl(DataSource dataSource) { this.template = new JdbcTemplate(dataSource); } public Employee getEmployee(int id) { return template.queryForObject(SEARCH, rowMapper, id); } private class EmployeeRowMapper implements RowMapper<Employee> { public Employee mapRow(ResultSet rs, int i) throws SQLException { Employee employee = new Employee(); employee.setId(rs.getInt(COLUMN_ID)); employee.setName(rs.getString(COLUMN_NAME)); return employee; } } }
The following configuration exposes the service over RMI:
server-config.xml
<context:component-scan base-package="xpadro.spring.integration"/> <int-rmi:inbound-gateway request-channel="requestEmployee"/> <int:channel id="requestEmployee"/> <int:service-activator method="retrieveEmployee" input-channel="requestEmployee" ref="defaultEmployeeService"/> <bean id="transactionManager" class="org.springframework.jdbc.datasource.DataSourceTransactionManager"> <property name="dataSource" ref="dataSource" /> </bean> <!-- in-memory database --> <jdbc:embedded-database id="dataSource"> <jdbc:script location="classpath:db/schemas/schema.sql" /> <jdbc:script location="classpath:db/schemas/data.sql" /> </jdbc:embedded-database>
Let’s focus on the lines with the ‘int’ namespace:
The gateway’s function is to separate the plumbing of the messaging system from the rest of the application. In this way, it gets hidden from the business logic. A gateway is bidirectional, so you have:
- Inbound gateway: Brings a message into the application and waits for a response.
- Outbound gateway: Invokes an external system and sends the response back to the application.
In this example, we are using a RMI inbound gateway. It will receive a message over RMI and send it to the requestEmployee channel, which is also defined here.
Finally, the service activator allows you to connect a spring bean to a message channel. Here, it is connected to the requestEmployee channel. The message will arrive to the channel and the service activator will invoke the retrieveEmployee method. Take into account that the ‘method’ attribute is not necessary if the bean has only one public method or has a method annotated with @ServiceActivator.
The response will then be sent to the reply channel. Since we didn’t define this channel, it will create a temporary reply channel.
3.Implementing the client
The client we are going to implement will invoke the service in order to retrieve an employee. To do this, it will use the MessagingTemplate class:
@RunWith(SpringJUnit4ClassRunner.class) @ContextConfiguration(locations={"classpath:xpadro/spring/integration/test/config/client-config.xml"}) public class TestRmiClient { @Autowired MessageChannel localChannel; @Autowired MessagingTemplate template; @Test public void retrieveExistingEmployee() { Employee employee = (Employee) template.convertSendAndReceive(localChannel, 2); Assert.assertNotNull(employee); Assert.assertEquals(2, employee.getId()); Assert.assertEquals("Bruce Springsteen", employee.getName()); } }
The client uses the messagingTemplate to convert the Integer object to a Message and send it to the local channel. As shown below, there’s an outbound gateway connected to the local channel. This outbound gateway will send the request message over RMI.
<int-rmi:outbound-gateway request-channel="localChannel" remote-channel="requestEmployee" host="localhost"/> <int:channel id="localChannel"/> <bean class="org.springframework.integration.core.MessagingTemplate" />
4.Abstracting SI logic
In the previous section, you may have noticed that the client class which accesses the service has Spring Integration specific logic mixed with its business code:
- It uses the MessagingTemplate, which is a SI class.
- It knows about the local channel, which is specific to the messaging system
In this section, I will implement this same example abstracting the messaging logic, so the client will only care about its business logic.
First, let’s take a look at the new client:
@RunWith(SpringJUnit4ClassRunner.class) @ContextConfiguration(locations={"classpath:xpadro/spring/integration/test/config/client-gateway-config.xml"}) public class TestRmiGatewayClient { @Autowired private EmployeeService service; @Test public void retrieveExistingEmployee() { Employee employee = service.retrieveEmployee(2); Assert.assertNotNull(employee); Assert.assertEquals(2, employee.getId()); Assert.assertEquals("Bruce Springsteen", employee.getName()); } }
We can see now that the client just implement its business logic, without using neither message channels nor messaging template. It will just call the service interface. All the messaging definitions are in the configuration file.
<int-rmi:outbound-gateway request-channel="localChannel" remote-channel="requestEmployee" host="localhost"/> <int:channel id="localChannel"/> <int:gateway default-request-channel="localChannel" service-interface="xpadro.spring.integration.service.EmployeeService"/>
client-gateway-config.xml
What we did here is add a gateway that will intercept calls to the service interface EmployeeService. Spring Integration will use the GatewayProxyFactoryBean class to create a proxy around the service interface. This proxy will use a messaging template to send the invocation to the request channel and wait for the response.
5.Conclusion
We have seen how to use Spring Integration to access a service over RMI. We have also seen that we can not only explicitly send messages using the MessagingTemplate but also do it transparently with GatewayProxyFactoryBean.
It thrown an exception when I ran TestRmiClient. Could you please help to fix it?
org.springframework.integration.MessageDeliveryException: Dispatcher has no subscribers.
at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:104)
at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:97)
at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:61)
at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:157)
at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:128)
at org.springframework.integration.core.MessagingTemplate.doSend(MessagingTemplate.java:288)
at org.springframework.integration.core.MessagingTemplate.doSendAndReceive(MessagingTemplate.java:318)
at org.springframework.integration.core.MessagingTemplate.sendAndReceive(MessagingTemplate.java:239)
at org.springframework.integration.core.MessagingTemplate.convertSendAndReceive(MessagingTemplate.java:254)
at xpadro.spring.integration.client.TestRmiClient.retrieveExistingEmployee(TestRmiClient.java:27)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25)
at java.lang.reflect.Method.invoke(Method.java:597)
at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:47)
at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:44)
at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
at org.springframework.test.context.junit4.statements.RunBeforeTestMethodCallbacks.evaluate(RunBeforeTestMethodCallbacks.java:74)
at org.springframework.test.context.junit4.statements.RunAfterTestMethodCallbacks.evaluate(RunAfterTestMethodCallbacks.java:83)
at org.springframework.test.context.junit4.statements.SpringRepeat.evaluate(SpringRepeat.java:72)
at org.springframework.test.context.junit4.SpringJUnit4ClassRunner.runChild(SpringJUnit4ClassRunner.java:231)
at org.springframework.test.context.junit4.SpringJUnit4ClassRunner.runChild(SpringJUnit4ClassRunner.java:88)
at org.junit.runners.ParentRunner$3.run(ParentRunner.java:238)
at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:63)
at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:236)
at org.junit.runners.ParentRunner.access$000(ParentRunner.java:53)
at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:229)
at org.springframework.test.context.junit4.statements.RunBeforeTestClassCallbacks.evaluate(RunBeforeTestClassCallbacks.java:61)
at org.springframework.test.context.junit4.statements.RunAfterTestClassCallbacks.evaluate(RunAfterTestClassCallbacks.java:71)
at org.junit.runners.ParentRunner.run(ParentRunner.java:309)
at org.springframework.test.context.junit4.SpringJUnit4ClassRunner.run(SpringJUnit4ClassRunner.java:174)
at org.eclipse.jdt.internal.junit4.runner.JUnit4TestReference.run(JUnit4TestReference.java:50)
at org.eclipse.jdt.internal.junit.runner.TestExecution.run(TestExecution.java:38)
at org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.runTests(RemoteTestRunner.java:467)
at org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.runTests(RemoteTestRunner.java:683)
at org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.run(RemoteTestRunner.java:390)
at org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.main(RemoteTestRunner.java:197)
Hi Logan,
Just cloned my github repository and run the test. Everything goes fine. Did you modify anything? The error message shows that the template is not finding its subscriber, which is the outbound gateway:
Did you modify the configuration?