Creating an object stream from a JDBC ResultSet
The introduction of features Stream API and Lambda in Java 8 enables us to make an elegant conversion from a JDBC ResultSet to a stream of objects just providing a mapping function. Such function could be, of course, a lambda. Basically, the idea is to generate a Stream using a ResultSet as Supplier:
01 02 03 04 05 06 07 08 09 10 11 12 13 14 15 16 17 18 19 20 21 22 | public class ResultSetSupplier implements Supplier<T>{ private final ResultSet rs; private final Function<ResultSet, T> mappingFunction; private ResultSetSupplier(ResultSet rs, Function<ResultSet, T> mappingFunction) { this .rs = rs; this .mappingFunction = mappingFunction; } @Override public T get() { try { if (rs.next()) return mappingFunction.apply(rs); } catch (SQLException e) { e.printStackTrace(); } return null ; } } |
Parameter mappingFunction, which might be a lambda expression, is used to build T instances from a ResultSet. Just like ActiveRecord pattern, every row in such ResultSet maps to an instance of T, where columns are attributes of T. Let’s consider class City:
01 02 03 04 05 06 07 08 09 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 | public class City{ String city; String country; public City(String city, String country) { this .city = city; this .country = country; } public String getCountry() { return country; } @Override public String toString() { return "City [city=" + city + ", country=" + country + ";]" ; } @Override public int hashCode() { final int prime = 31 ; int result = 1 ; result = prime * result + ((city == null ) ? 0 : city.hashCode()); result = prime * result + ((country == null ) ? 0 : country.hashCode()); return result; } @Override public boolean equals(Object obj) { if ( this == obj) return true ; if (obj == null ) return false ; if (getClass() != obj.getClass()) return false ; City other = (City) obj; if (city == null ) { if (other.city != null ) return false ; } else if (!city.equals(other.city)) return false ; if (country == null ) { if (other.country != null ) return false ; } else if (!country.equals(other.country)) return false ; return true ; } } |
The mapping function for City objects could be a lambda expression like the following:
1 2 3 4 5 6 7 | (ResultSet rs) -> { try { return new City(rs.getString( "city" ), rs.getString( "country" )); } catch (Exception e) { return null ; }} |
We have assumed database columns are called city and country, respectively. Although both PreparedStatement and ResultSet implement AutoCloseable interface, as a resultSet must be provided to create the object stream, it does make sense to close such resultSet when the stream is closed as well. A possible approach could be to use a proxy to intercept method invocation on the object stream. Thus, as close() method is invoked on the proxy, it will invoke close() on the provided resultSet. All method invocations will be invoked on the object stream as well, in order to be able to provide all Stream features. That is easy to achieve using a proxy. Let’s have a look. We will have a proxy factory and a invocation handler:
01 02 03 04 05 06 07 08 09 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 | public class ResultSetStreamInvocationHandler<T> implements InvocationHandler{ private Stream<T> stream; // proxy will intercept method calls to such stream private PreparedStatement st; private ResultSet rs; public void setup(PreparedStatement st, Function<ResultSet, T> mappingFunction) throws SQLException{ // PreparedStatement must be already setup in order // to just call executeQuery() this .st = st; rs = st.executeQuery(); stream = Stream.generate( new ResultSetSupplier(rs, mappingFunction)); } @Override public Object invoke(Object proxy, Method method, Object[] args) throws Throwable { if (method == null ) throw new RuntimeException( "null method null" ); // implement AutoCloseable for PreparedStatement // as calling close() more than once has no effects if (method.getName().equals( "close" ) && args == null ){ // invoked close(), no arguments if (st != null ){ st.close(); // closes ResultSet too } } return method.invoke(stream, args); } private class ResultSetSupplier implements Supplier<T>{ private final ResultSet rs; private final Function<ResultSet, T> mappingFunction; private ResultSetSupplier(ResultSet rs, Function<ResultSet, T> mappingFunction) { this .rs = rs; this .mappingFunction = mappingFunction; } @Override public T get() { try { if (rs.next()) return mappingFunction.apply(rs); } catch (SQLException e) { e.printStackTrace(); } return null ; } } } |
Please note how invoke is used to intercept method calls. In case close() is called, close() is called on PreparedStatement as well. For every method called, the corresponding method call is invoked in the stream being proxied. And the factory:
01 02 03 04 05 06 07 08 09 10 11 12 13 14 | public class ResultSetStream<T>{ @SuppressWarnings ( "unchecked" ) public Stream<T> getStream(PreparedStatement st, Function<ResultSet, T> mappingFunction) throws SQLException{ final ResultSetStreamInvocationHandler<T> handler = new ResultSetStreamInvocationHandler<T>(); handler.setup(st, mappingFunction); Stream<T> proxy = (Stream<T>) Proxy.newProxyInstance(getClass().getClassLoader(), new Class<?>[] {Stream. class }, handler); return proxy; } } |
To put it all together, let’s write a simple test to show usage. Mockito will be used to mock both PreparedStatement and ResultSet to avoid running tests against a real database.
001 002 003 004 005 006 007 008 009 010 011 012 013 014 015 016 017 018 019 020 021 022 023 024 025 026 027 028 029 030 031 032 033 034 035 036 037 038 039 040 041 042 043 044 045 046 047 048 049 050 051 052 053 054 055 056 057 058 059 060 061 062 063 064 065 066 067 068 069 070 071 072 073 074 075 076 077 078 079 080 081 082 083 084 085 086 087 088 089 090 091 092 093 094 095 096 097 098 099 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 | public class ResultSetStreamTest { private class City{ String city; String country; public City(String city, String country) { this .city = city; this .country = country; } public String getCountry() { return country; } @Override public String toString() { return "City [city=" + city + ", country=" + country + "]" ; } @Override public int hashCode() { final int prime = 31 ; int result = 1 ; result = prime * result + getOuterType().hashCode(); result = prime * result + ((city == null ) ? 0 : city.hashCode()); result = prime * result + ((country == null ) ? 0 : country.hashCode()); return result; } @Override public boolean equals(Object obj) { if ( this == obj) return true ; if (obj == null ) return false ; if (getClass() != obj.getClass()) return false ; City other = (City) obj; if (!getOuterType().equals(other.getOuterType())) return false ; if (city == null ) { if (other.city != null ) return false ; } else if (!city.equals(other.city)) return false ; if (country == null ) { if (other.country != null ) return false ; } else if (!country.equals(other.country)) return false ; return true ; } private ResultSetStreamTest getOuterType() { return ResultSetStreamTest. this ; } } private String[][] data = new String[][]{ { "Karachi" , "Pakistan" }, { "Istanbul" , "Turkey" }, { "Hong Kong" , "China" }, { "Saint Petersburg" , "Russia" }, { "Sydney" , "Australia" }, { "Berlin" , "Germany" }, { "Madrid" , "Spain" } }; private int timesCalled; private PreparedStatement mockPST; private ResultSet mockRS; @Before public void setup() throws SQLException{ timesCalled = - 1 ; mockRS = mock(ResultSet. class ); mockPST = mock(PreparedStatement. class ); when(mockRS.next()).thenAnswer( new Answer<Boolean>() { @Override public Boolean answer(InvocationOnMock invocation) throws Throwable { if (timesCalled++ >= data.length) return false ; return true ; } }); when(mockRS.getString(eq( "city" ))).thenAnswer( new Answer<String>() { @Override public String answer(InvocationOnMock invocation) throws Throwable { return data[timesCalled][ 0 ]; } }); when(mockRS.getString(eq( "country" ))).thenAnswer( new Answer<String>() { @Override public String answer(InvocationOnMock invocation) throws Throwable { return data[timesCalled][ 1 ]; } }); when(mockPST.executeQuery()).thenReturn(mockRS); } @Test public void simpleTest() throws SQLException{ try (Stream<City> testStream = new ResultSetStream<City>().getStream(mockPST, (ResultSet rs) -> { try { return new City(rs.getString( "city" ), rs.getString( "country" )); } catch (Exception e) { return null ; }})){ Iterator<City> cities = testStream.filter( city -> !city.getCountry().equalsIgnoreCase( "China" )) .limit( 3 ).iterator(); assertTrue(cities.hasNext()); assertEquals( new City( "Karachi" , "Pakistan" ), cities.next()); assertTrue(cities.hasNext()); assertEquals( new City( "Istanbul" , "Turkey" ), cities.next()); assertTrue(cities.hasNext()); assertEquals( new City( "Saint Petersburg" , "Russia" ), cities.next()); assertFalse(cities.hasNext()); } } } |
- Download full source code on Github.
Reference: | Creating an object stream from a JDBC ResultSet from our JCG partner Sergio Molina at the TODOdev blog. |
Sergio, thank you for writing this article.
I had a question. Is it possible that the Stream can be infinite?
stream = Stream.generate(new ResultSetSupplier(resultSet, mappingFunction));
is .limit the only option to terminate the Stream? Is there any means of closing the Stream from within a foreach block
final Stream stream = (new ResultSetStream()).getStream(statement, mappingFunction);
stream.forEach(r -> {
System.out.format(“Writing row.. %s%n”, r);
if(r != null){
write(bufferedWriter, r.toString());
}
});
Hi
Thanks for the feedback. There are some interesting comments on that subject at https://tododev.wordpress.com/2014/08/14/creating-and-object-stream-from-a-jdbc-resultset/
BR
Sergio