Spring Integration Jdbc RowMapper Example
The JDBC inbound channel adapter’s basic function is to execute a SQL query, extract the data and pass the result set encapsulated in the form of a Message
onto the local channels. You can read more about this in my example on JDBC Inbound Channel Adapter.
The type of the payload is decided by the row-mapping strategy. The default one results into a payload of type List where each element is a Map of column values. In our previous article on Jdbc Inbound Adapter, we have used the default row-mapping strategy which is why the message contains List of map values as the payload. the column values will be returned as a Map with the column name being the key values.
In this article, we will see how to customize the mapping strategy so that the payload is a List of POJOs. We will also learn about how to handle the JDBC message and the use of transactional
element.
Before we start with the example, lets first add module dependencies to our pom.xml
.
Dependencies
Add the following dependencies:
spring-core
spring-context
spring-integration-core
spring-integration-jdbc
– This is requited to access the jdbc adaptermysql-connector-java
– We will be using MySQL as the Database so you need to add MySql driver
pom.xml:
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>com.javarticles.spring.integration.jms</groupId> <artifactId>springintegrationjms</artifactId> <version>0.0.1-SNAPSHOT</version> <dependencies> <dependency> <groupId>org.springframework</groupId> <artifactId>spring-core</artifactId> <version>${spring.version}</version> </dependency> <dependency> <groupId>org.springframework</groupId> <artifactId>spring-context</artifactId> <version>${spring.version}</version> </dependency> <dependency> <groupId>org.springframework.integration</groupId> <artifactId>spring-integration-core</artifactId> <version>4.1.2.RELEASE</version> </dependency> <dependency> <groupId>org.springframework.integration</groupId> <artifactId>spring-integration-jdbc</artifactId> <version>4.1.2.RELEASE</version> <scope>compile</scope> </dependency> <dependency> <groupId>mysql</groupId> <artifactId>mysql-connector-java</artifactId> <version>5.1.26</version> </dependency> </dependencies> <properties> <spring.version>4.1.4.RELEASE</spring.version> </properties> </project>
Custom Row Mapper
Article:
package com.javarticles.spring.integration.jdbc; public class Article { private int id; private String name; private String tags; private String category; private String author; public int getId() { return id; } public void setId(int id) { this.id = id; } public String getName() { return name; } public void setName(String name) { this.name = name; } public String getTags() { return tags; } public void setTags(String tags) { this.tags = tags; } public String getCategory() { return category; } public void setCategory(String category) { this.category = category; } public String getAuthor() { return author; } public void setAuthor(String author) { this.author = author; } public String toString() { StringBuilder sb = new StringBuilder("id: "); sb.append(id).append(", name: ").append(name).append(", tags: ") .append(tags).append(", category: ").append(category) .append(", author").append(author); return sb.toString(); } }
We will customize the payload object by implementing the org.springframework.jdbc.core.RowMapper interface and referencing this class through the row-mapper attribute.
ArticleRowMapper:
package com.javarticles.spring.integration.jdbc; import java.sql.ResultSet; import java.sql.SQLException; import org.springframework.jdbc.core.RowMapper; public class ArticleRowMapper implements RowMapper<Article> { public Article mapRow(ResultSet rs, int rowNum) throws SQLException { String name = rs.getString("name"); String category = rs.getString("category"); String author = rs.getString("author"); String tags = rs.getString("tags"); int id = rs.getInt("id"); Article article = new Article(); article.setId(id); article.setCategory(category); article.setAuthor(author); article.setName(name); article.setTags(tags); return article; } }
Here is the schema and some test data:
db-schema.sql:
drop table if exists `articles`; CREATE TABLE `articles` ( `ID` INT(10) UNSIGNED NOT NULL AUTO_INCREMENT, `NAME` VARCHAR(100) NOT NULL, `CATEGORY` VARCHAR(50) NOT NULL, `TAGS` VARCHAR(100) NOT NULL, `AUTHOR` VARCHAR(50) NOT NULL, `SENT` INT, PRIMARY KEY (`ID`) ) ENGINE=InnoDB AUTO_INCREMENT=2 DEFAULT CHARSET=utf8;
db-test-data.sql:
insert into articles(id, name, category, tags, author, sent) values (1, "SpringIntegration Example", "spring", "spring,integration", "Joe", 0); insert into articles(id, name, category, tags, author, sent) values (2, "NamedParameterJdbcTemplate Example", "spring", "spring,jdbcTemplate", "Sam", 0); insert into articles(id, name, category, tags, author, sent) values (3, "MVC Example", "spring", "spring", "Joe", 0);
We will also introduce transaction concept here. Once the articles are extracted, we want to update the ‘sent’ column to 1 so that the already read articles don’t show up in the next polling.
We will simply add the tansactional element to the poller element. This will cause the update and select queries to run in the same transaction. Since we are relying on transactions, we need to configure the transaction manager.
One more thing that we have introduced is a service adapter, in case, you want to handle the JDBC message.
JdbcMessageHandler:
package com.javarticles.spring.integration.jdbc; import java.util.List; public class JdbcMessageHandler { public void handleMessage(List<Article> articleList) { System.out.println("In JdbcMessageHandler:" + articleList); } }
Let’s see our configuration.
jdbcInboundApplicationContext.xml:
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:int="http://www.springframework.org/schema/integration" xmlns:jdbc="http://www.springframework.org/schema/jdbc" xmlns:int-jdbc="http://www.springframework.org/schema/integration/jdbc" xmlns:context="http://www.springframework.org/schema/context" xmlns:p="http://www.springframework.org/schema/p" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://www.springframework.org/schema/integration http://www.springframework.org/schema/integration/spring-integration.xsd http://www.springframework.org/schema/integration/jdbc http://www.springframework.org/schema/integration/jdbc/spring-integration-jdbc.xsd http://www.springframework.org/schema/jdbc http://www.springframework.org/schema/jdbc/spring-jdbc-3.0.xsd"> <int-jdbc:inbound-channel-adapter id="dataChannel" query="select * from articles where author='Joe' and sent = 0" update="update articles set sent = 1 where id in (:id)" data-source="dataSource" row-mapper="articleRowMapper"> <int:poller fixed-rate="100"> <int:transactional /> </int:poller> </int-jdbc:inbound-channel-adapter> <int:service-activator input-channel="dataChannel" ref="jdbcMessageHandler" /> <bean id="jdbcMessageHandler" class="com.javarticles.spring.integration.jdbc.JdbcMessageHandler" /> <bean id="transactionManager" class="org.springframework.jdbc.datasource.DataSourceTransactionManager"> <property name="dataSource" ref="dataSource" /> </bean> <int:poller default="true" fixed-rate="100" /> <int:channel id="dataChannel"> <int:queue /> </int:channel> <jdbc:initialize-database data-source="dataSource" enabled="true"> <jdbc:script location="classpath:db-schema.sql" /> <jdbc:script location="classpath:db-test-data.sql" /> </jdbc:initialize-database> <bean id="dataSource" class="org.springframework.jdbc.datasource.DriverManagerDataSource"> <property name="driverClassName" value="com.mysql.jdbc.Driver" /> <property name="url" value="jdbc:mysql://localhost/test" /> <property name="username" value="root" /> <property name="password" value="mnrpass" /> </bean> <bean id="articleRowMapper" class="com.javarticles.spring.integration.jdbc.ArticleRowMapper" /> </beans>
Our main class looks simple, we just need to load the context to initiate the polling.
SpringIntegrationJdbcInboundRowMapperExample:
package com.javarticles.spring.integration.jdbc; import java.io.IOException; import java.sql.SQLException; import org.springframework.context.support.ClassPathXmlApplicationContext; public class SpringIntegrationJdbcInboundRowMapperExample { public static void main(String[] args) throws InterruptedException, IOException, SQLException { ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext( "jdbcInboundApplicationContext.xml"); Thread.sleep(1000); context.close(); } }
Output:
In JdbcMessageHandler:[id: 1, name: SpringIntegration Example, tags: spring,integration, category: spring, authorJoe, sent: 0, id: 3, name: MVC Example, tags: spring, category: spring, authorJoe, sent: 0]
Download the source code
This was an example about Jdbc Inbound Adapter with a RowMapper. You can download the source code here: springintegrationJdbcRowMapper.zip
Reference: | Spring Integration Jdbc RowMapper Example from our JCG partner Ram Mokkapaty at the Java Articles blog. |