Processing JDBC ResultSet with Stream API
Java’s Stream API, introduced in Java 8, is an incredibly powerful tool for handling collections of data in a functional style. However, when working with JDBC, which relies on ResultSet to retrieve database query results, directly integrating the Stream API with JDBC ResultSet can be challenging. In this article, we’ll explore various methods to process a ResultSet using the Stream API effectively, covering approaches like spliterators, third-party libraries such as JOOQ and jdbc-stream, and emphasizing best practices for resource management. All examples are tailored for Java 21, showcasing how to seamlessly integrate the Stream API with JDBC ResultSet.
1. Overview
JDBC ResultSet
is a cursor-based mechanism for fetching rows from a database query result. Although it provides a robust way to retrieve data, it lacks native support for the Stream API, which would allow us to process data in a functional manner. By leveraging the Stream API, we can process rows more effectively, especially for tasks like filtering, mapping, and reduction. Let’s explore various approaches to bridge the gap between JDBC and Stream.
2. Process Resultset using Stream API with Spliterators
One of the most versatile ways to convert a ResultSet
to a stream is by using the Spliterator
interface. A spliterator is a special iterator optimized for parallelism, capable of splitting elements for efficient processing. We can create a custom spliterator for the ResultSet
to integrate it with streams.
We will need a database-specific driver. For example, if you’re using H2 as your database, include its dependency as well:
<dependency>
<groupId>com.h2database</groupId>
<artifactId>h2</artifactId>
<version>2.2.224</version>
<scope>test</scope>
</dependency>
We will also need to add the following JUnit 5 dependency for testing
<dependency>
<groupId>org.junit.jupiter</groupId>
<artifactId>junit-jupiter</artifactId>
<version>5.10.0</version>
<scope>test</scope>
</dependency>
Here’s an example of implementing a ResultSetSpliterator
and using it with Stream API.
import java.sql.*;
import java.util.Spliterators;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;
public class ResultSetSpliterator<T> extends Spliterators.AbstractSpliterator<T> {
private final ResultSet resultSet;
private final RowMapper<T> rowMapper;
public ResultSetSpliterator(ResultSet resultSet, RowMapper<T> rowMapper) {
super(Long.MAX_VALUE, ORDERED | NONNULL);
this.resultSet = resultSet;
this.rowMapper = rowMapper;
}
@Override
public boolean tryAdvance(java.util.function.Consumer<? super T> action) {
try {
if (!resultSet.next()) return false;
action.accept(rowMapper.mapRow(resultSet));
return true;
} catch (SQLException e) {
throw new RuntimeException("Error processing ResultSet", e);
}
}
}
import java.sql.ResultSet;
import java.util.Spliterators;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;
@FunctionalInterface
public interface RowMapper<T> {
T mapRow(ResultSet rs) throws SQLException;
}
public class ResultSetStreamExample {
public static <T> Stream<T> stream(ResultSet resultSet, RowMapper<T> rowMapper) {
return StreamSupport.stream(new ResultSetSpliterator<>(resultSet, rowMapper), false);
}
}
To demonstrate the use of spliterator, we need to define a RowMapper
interface that maps a ResultSet
row to a custom type Stock
, representing a stock shown below.
public record Stock(String symbol, double price, int volume) {
@Override
public String toString() {
return "Stock[symbol=" + symbol + ", price=" + price + ", volume=" + volume + "]";
}
}
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.ResultSet;
import java.sql.Statement;
import java.util.List;
import java.util.stream.Collectors;
import static org.junit.jupiter.api.Assertions.assertEquals;
class ResultSetStreamExampleTest {
private Connection connection;
@BeforeEach
void setUp() throws Exception {
// Set up an in-memory H2 database and populate it with test data
connection = DriverManager.getConnection("jdbc:h2:mem:testdb", "sa", "");
try (Statement statement = connection.createStatement()) {
statement.execute("CREATE TABLE stocks (symbol VARCHAR(10), price DOUBLE, volume INT)");
statement.execute("INSERT INTO stocks VALUES ('AAPL', 150.0, 1000), ('GOOGL', 95.0, 800), ('MSFT', 210.0, 1500)");
}
}
@AfterEach
void tearDown() throws Exception {
// Clean up and close the database connection
if (connection != null && !connection.isClosed()) {
connection.close();
}
}
@Test
void testStreamProcessesResultSetCorrectly() throws Exception {
try (Statement statement = connection.createStatement();
ResultSet resultSet = statement.executeQuery("SELECT symbol, price, volume FROM stocks")) {
// Stream the ResultSet
List<Stock> filteredStocks = ResultSetStreamExample.stream(resultSet, rs -> new Stock(
rs.getString("symbol"),
rs.getDouble("price"),
rs.getInt("volume")
)).filter(stock -> stock.price() > 100) // Filter stocks with prices above $100
.collect(Collectors.toList());
// Validate the results
assertEquals(2, filteredStocks.size());
assertEquals(new Stock("AAPL", 150.0, 1000), filteredStocks.get(0));
assertEquals(new Stock("MSFT", 210.0, 1500), filteredStocks.get(1));
}
}
}
This example demonstrates how we can create a stream from a ResultSet
using Spliterator
, allowing us to process each row in a stream pipeline.
3. Process Resultset using Stream API with JOOQ
JOOQ (Java Object-Oriented Querying) is a popular library for database querying in Java, offering a powerful DSL and streamlined access to data. JOOQ natively supports streaming of query results, which is extremely useful for working with ResultSet
data.
To use JOOQ in your Java project, you need to include the appropriate Maven dependency in your pom.xml
. Here’s the dependency for JOOQ:
<dependency>
<groupId>org.jooq</groupId>
<artifactId>jooq</artifactId>
<version>3.18.5</version>
</dependency>
Additionally, you’ll need a database-specific driver. For example, if you’re using H2 as your database, include its dependency as well:
<dependency>
<groupId>com.h2database</groupId>
<artifactId>h2</artifactId>
<version>2.2.224</version>
<scope>test</scope>
</dependency>
We will also need to add the following JUnit 5 dependency for testing
<dependency>
<groupId>org.junit.jupiter</groupId>
<artifactId>junit-jupiter</artifactId>
<version>5.10.0</version>
<scope>test</scope>
</dependency>
Here’s how we can use JOOQ’s streaming support with Stream
using the Stock
record
:
import org.jooq.DSLContext;
import org.jooq.Record;
import org.jooq.SQLDialect;
import org.jooq.impl.DSL;
import java.util.stream.Stream;
public class JooqStreamExample {
public static Stream<Stock> getStockStream(DSLContext create) {
return create.selectFrom("stocks").stream()
.map(record -> new Stock(
record.get("symbol", String.class),
record.get("price", Double.class),
record.get("volume", Integer.class)));
}
}
import org.jooq.DSLContext;
import org.jooq.SQLDialect;
import org.jooq.impl.DSL;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import java.sql.Connection;
import java.sql.DriverManager;
import java.util.List;
import java.util.stream.Collectors;
import static org.junit.jupiter.api.Assertions.assertEquals;
class JooqStreamExampleTest {
private Connection connection;
private DSLContext create;
@BeforeEach
void setUp() throws Exception {
// Set up an in-memory H2 database and JOOQ DSLContext
connection = DriverManager.getConnection("jdbc:h2:mem:testdb", "sa", "");
create = DSL.using(connection, SQLDialect.H2);
// Create the stocks table and populate it with test data
create.execute("CREATE TABLE stocks (symbol VARCHAR(10), price DOUBLE, volume INT)");
create.execute("INSERT INTO stocks VALUES ('AAPL', 150.0, 1000), ('GOOGL', 95.0, 800), ('MSFT', 210.0, 1500)");
}
@AfterEach
void tearDown() throws Exception {
// Clean up resources
if (connection != null && !connection.isClosed()) {
connection.close();
}
}
@Test
void testGetStockStreamFiltersAndMapsCorrectly() {
// Get the stock stream
List<Stock> filteredStocks = JooqStreamExample.getStockStream(create)
.filter(stock -> stock.price() > 100) // Filter stocks with price > 100
.collect(Collectors.toList());
// Validate the results
assertEquals(2, filteredStocks.size());
assertEquals(new Stock("AAPL", 150.0, 1000), filteredStocks.get(0));
assertEquals(new Stock("MSFT", 210.0, 1500), filteredStocks.get(1));
}
}
JOOQ simplifies the process, offering stream()
directly on the query, eliminating the need for custom spliterators.
4. Process Resultset using Stream API with jdbc-stream
Another option is using the jdbc-stream library, designed to provide a direct way of converting ResultSet
into a Java Stream using the Stock
record
.
We will need a database-specific driver. For example, if you’re using H2 as your database, include its dependency as well:
<dependency>
<groupId>com.h2database</groupId>
<artifactId>h2</artifactId>
<version>2.2.224</version>
<scope>test</scope>
</dependency>
We will also need to add the following JUnit 5 dependency for testing
<dependency>
<groupId>org.junit.jupiter</groupId>
<artifactId>junit-jupiter</artifactId>
<version>5.10.0</version>
<scope>test</scope>
</dependency>
Here’s an example of using Stream API with jdbc-stream .
import com.pivovarit.stream.StreamSupport;
import java.sql.ResultSet;
import java.util.stream.Stream;
public class JdbcStreamExample {
public static Stream<Stock> getStockStream(ResultSet resultSet) {
return StreamSupport.stream(
resultSet,
rs -> new Stock(
rs.getString("symbol"),
rs.getDouble("price"),
rs.getInt("volume")
)
);
}
}
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.ResultSet;
import java.sql.Statement;
import java.util.List;
import java.util.stream.Collectors;
import static org.junit.jupiter.api.Assertions.assertEquals;
class JdbcStreamExampleTest {
private Connection connection;
private Statement statement;
@BeforeEach
void setUp() throws Exception {
// Set up an in-memory H2 database
connection = DriverManager.getConnection("jdbc:h2:mem:testdb", "sa", "");
statement = connection.createStatement();
// Create the stocks table and insert test data
statement.execute("CREATE TABLE stocks (symbol VARCHAR(10), price DOUBLE, volume INT)");
statement.execute("INSERT INTO stocks VALUES ('AAPL', 150.0, 1000), ('GOOGL', 95.0, 800), ('MSFT', 210.0, 1500)");
}
@AfterEach
void tearDown() throws Exception {
// Clean up resources
if (statement != null && !statement.isClosed()) {
statement.close();
}
if (connection != null && !connection.isClosed()) {
connection.close();
}
}
@Test
void testGetStockStreamFiltersAndMapsCorrectly() throws Exception {
try (ResultSet resultSet = statement.executeQuery("SELECT symbol, price, volume FROM stocks")) {
// Get the stock stream and filter stocks with price > 100
List<Stock> filteredStocks = JdbcStreamExample.getStockStream(resultSet)
.filter(stock -> stock.price() > 100)
.collect(Collectors.toList());
// Validate the results
assertEquals(2, filteredStocks.size());
assertEquals(new Stock("AAPL", 150.0, 1000), filteredStocks.get(0));
assertEquals(new Stock("MSFT", 210.0, 1500), filteredStocks.get(1));
}
}
}
This library simplifies the process, making it easy to turn a ResultSet
into a Stream with minimal boilerplate code.
5. Close Resources
Properly closing resources such as Connection
, Statement
, and ResultSet
is crucial when working with JDBC to avoid resource leaks, which can lead to performance degradation and database connection issues. Java’s try-with-resources statement simplifies this by automatically managing the lifecycle of AutoCloseable resources.
Best Practices for Closing Resources
Streams created from ResultSet
(e.g., using Spliterator
or libraries like jdbc-stream
) must ensure the ResultSet
and associated resources are closed when the stream is no longer needed.
- Always Use Try-With-Resources: Resources such as
Connection
,Statement
, andResultSet
should be declared within a try-with-resources block to ensure they are closed automatically when the block exits. - Exception Handling: Exceptions during resource closing are suppressed if another exception occurs in the try block. These suppressed exceptions can still be accessed via
Throwable.getSuppressed()
.
In all previous examples, AutoCloseable JDBC resources (Connection
, Statement
, ResultSet
) are closed automatically when the block exits, whether successfully or due to an exception.
6. Conclusion
Processing JDBC ResultSet
with the Stream API in Java can significantly improve the readability and functionality of data processing code. We covered several approaches:
- Using custom
Spliterator
to transformResultSet
into a stream. - Leveraging JOOQ’s built-in streaming support for a seamless, high-level approach.
- Using
jdbc-stream
to simplify the process with a lightweight library.
Each method offers distinct advantages, so the best choice depends on your specific requirements. Whichever approach you choose, remember to manage your resources effectively, utilizing try-with-resources to avoid memory leaks.