Core Java

Processing JDBC ResultSet with Stream API

Java’s Stream API, introduced in Java 8, is an incredibly powerful tool for handling collections of data in a functional style. However, when working with JDBC, which relies on ResultSet to retrieve database query results, directly integrating the Stream API with JDBC ResultSet can be challenging. In this article, we’ll explore various methods to process a ResultSet using the Stream API effectively, covering approaches like spliterators, third-party libraries such as JOOQ and jdbc-stream, and emphasizing best practices for resource management. All examples are tailored for Java 21, showcasing how to seamlessly integrate the Stream API with JDBC ResultSet.

1. Overview

JDBC ResultSet is a cursor-based mechanism for fetching rows from a database query result. Although it provides a robust way to retrieve data, it lacks native support for the Stream API, which would allow us to process data in a functional manner. By leveraging the Stream API, we can process rows more effectively, especially for tasks like filtering, mapping, and reduction. Let’s explore various approaches to bridge the gap between JDBC and Stream.

2. Process Resultset using Stream API with Spliterators

One of the most versatile ways to convert a ResultSet to a stream is by using the Spliterator interface. A spliterator is a special iterator optimized for parallelism, capable of splitting elements for efficient processing. We can create a custom spliterator for the ResultSet to integrate it with streams.

We will need a database-specific driver. For example, if you’re using H2 as your database, include its dependency as well:

    <dependency>
        <groupId>com.h2database</groupId>
        <artifactId>h2</artifactId>
        <version>2.2.224</version>
        <scope>test</scope>
    </dependency>

We will also need to add the following JUnit 5 dependency for testing

    <dependency>
        <groupId>org.junit.jupiter</groupId>
        <artifactId>junit-jupiter</artifactId>
        <version>5.10.0</version>
        <scope>test</scope>
    </dependency>

Here’s an example of implementing a ResultSetSpliterator and using it with Stream API.

import java.sql.*;
import java.util.Spliterators;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;

public class ResultSetSpliterator<T> extends Spliterators.AbstractSpliterator<T> {
    private final ResultSet resultSet;
    private final RowMapper<T> rowMapper;

    public ResultSetSpliterator(ResultSet resultSet, RowMapper<T> rowMapper) {
        super(Long.MAX_VALUE, ORDERED | NONNULL);
        this.resultSet = resultSet;
        this.rowMapper = rowMapper;
    }

    @Override
    public boolean tryAdvance(java.util.function.Consumer<? super T> action) {
        try {
            if (!resultSet.next()) return false;
            action.accept(rowMapper.mapRow(resultSet));
            return true;
        } catch (SQLException e) {
            throw new RuntimeException("Error processing ResultSet", e);
        }
    }
}

import java.sql.ResultSet;
import java.util.Spliterators;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;

@FunctionalInterface
public interface RowMapper<T> {
    T mapRow(ResultSet rs) throws SQLException;
}

public class ResultSetStreamExample {
    public static <T> Stream<T> stream(ResultSet resultSet, RowMapper<T> rowMapper) {
        return StreamSupport.stream(new ResultSetSpliterator<>(resultSet, rowMapper), false);
    }
}

To demonstrate the use of spliterator, we need to define a RowMapper interface that maps a ResultSet row to a custom type Stock, representing a stock shown below.

public record Stock(String symbol, double price, int volume) {
    @Override
    public String toString() {
        return "Stock[symbol=" + symbol + ", price=" + price + ", volume=" + volume + "]";
    }
}
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.ResultSet;
import java.sql.Statement;
import java.util.List;
import java.util.stream.Collectors;

import static org.junit.jupiter.api.Assertions.assertEquals;

class ResultSetStreamExampleTest {

    private Connection connection;

    @BeforeEach
    void setUp() throws Exception {
        // Set up an in-memory H2 database and populate it with test data
        connection = DriverManager.getConnection("jdbc:h2:mem:testdb", "sa", "");
        try (Statement statement = connection.createStatement()) {
            statement.execute("CREATE TABLE stocks (symbol VARCHAR(10), price DOUBLE, volume INT)");
            statement.execute("INSERT INTO stocks VALUES ('AAPL', 150.0, 1000), ('GOOGL', 95.0, 800), ('MSFT', 210.0, 1500)");
        }
    }

    @AfterEach
    void tearDown() throws Exception {
        // Clean up and close the database connection
        if (connection != null && !connection.isClosed()) {
            connection.close();
        }
    }

    @Test
    void testStreamProcessesResultSetCorrectly() throws Exception {
        try (Statement statement = connection.createStatement();
             ResultSet resultSet = statement.executeQuery("SELECT symbol, price, volume FROM stocks")) {

            // Stream the ResultSet
            List<Stock> filteredStocks = ResultSetStreamExample.stream(resultSet, rs -> new Stock(
                rs.getString("symbol"),
                rs.getDouble("price"),
                rs.getInt("volume")
            )).filter(stock -> stock.price() > 100) // Filter stocks with prices above $100
              .collect(Collectors.toList());

            // Validate the results
            assertEquals(2, filteredStocks.size());
            assertEquals(new Stock("AAPL", 150.0, 1000), filteredStocks.get(0));
            assertEquals(new Stock("MSFT", 210.0, 1500), filteredStocks.get(1));
        }
    }
}

This example demonstrates how we can create a stream from a ResultSet using Spliterator, allowing us to process each row in a stream pipeline.

3. Process Resultset using Stream API with JOOQ

JOOQ (Java Object-Oriented Querying) is a popular library for database querying in Java, offering a powerful DSL and streamlined access to data. JOOQ natively supports streaming of query results, which is extremely useful for working with ResultSet data.

To use JOOQ in your Java project, you need to include the appropriate Maven dependency in your pom.xml. Here’s the dependency for JOOQ:

    <dependency>
        <groupId>org.jooq</groupId>
        <artifactId>jooq</artifactId>
        <version>3.18.5</version>
    </dependency>

Additionally, you’ll need a database-specific driver. For example, if you’re using H2 as your database, include its dependency as well:

    <dependency>
        <groupId>com.h2database</groupId>
        <artifactId>h2</artifactId>
        <version>2.2.224</version>
        <scope>test</scope>
    </dependency>

We will also need to add the following JUnit 5 dependency for testing

    <dependency>
        <groupId>org.junit.jupiter</groupId>
        <artifactId>junit-jupiter</artifactId>
        <version>5.10.0</version>
        <scope>test</scope>
    </dependency>

Here’s how we can use JOOQ’s streaming support with Stream using the Stock record:

import org.jooq.DSLContext;
import org.jooq.Record;
import org.jooq.SQLDialect;
import org.jooq.impl.DSL;

import java.util.stream.Stream;

public class JooqStreamExample {
    public static Stream<Stock> getStockStream(DSLContext create) {
        return create.selectFrom("stocks").stream()
                     .map(record -> new Stock(
                         record.get("symbol", String.class),
                         record.get("price", Double.class),
                         record.get("volume", Integer.class)));
    }
}
import org.jooq.DSLContext;
import org.jooq.SQLDialect;
import org.jooq.impl.DSL;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

import java.sql.Connection;
import java.sql.DriverManager;
import java.util.List;
import java.util.stream.Collectors;

import static org.junit.jupiter.api.Assertions.assertEquals;

class JooqStreamExampleTest {

    private Connection connection;
    private DSLContext create;

    @BeforeEach
    void setUp() throws Exception {
        // Set up an in-memory H2 database and JOOQ DSLContext
        connection = DriverManager.getConnection("jdbc:h2:mem:testdb", "sa", "");
        create = DSL.using(connection, SQLDialect.H2);

        // Create the stocks table and populate it with test data
        create.execute("CREATE TABLE stocks (symbol VARCHAR(10), price DOUBLE, volume INT)");
        create.execute("INSERT INTO stocks VALUES ('AAPL', 150.0, 1000), ('GOOGL', 95.0, 800), ('MSFT', 210.0, 1500)");
    }

    @AfterEach
    void tearDown() throws Exception {
        // Clean up resources
        if (connection != null && !connection.isClosed()) {
            connection.close();
        }
    }

    @Test
    void testGetStockStreamFiltersAndMapsCorrectly() {
        // Get the stock stream
        List<Stock> filteredStocks = JooqStreamExample.getStockStream(create)
                                                       .filter(stock -> stock.price() > 100)  // Filter stocks with price > 100
                                                       .collect(Collectors.toList());

        // Validate the results
        assertEquals(2, filteredStocks.size());
        assertEquals(new Stock("AAPL", 150.0, 1000), filteredStocks.get(0));
        assertEquals(new Stock("MSFT", 210.0, 1500), filteredStocks.get(1));
    }
}

JOOQ simplifies the process, offering stream() directly on the query, eliminating the need for custom spliterators.

4. Process Resultset using Stream API with jdbc-stream

Another option is using the jdbc-stream library, designed to provide a direct way of converting ResultSet into a Java Stream using the Stock record.

We will need a database-specific driver. For example, if you’re using H2 as your database, include its dependency as well:

    <dependency>
        <groupId>com.h2database</groupId>
        <artifactId>h2</artifactId>
        <version>2.2.224</version>
        <scope>test</scope>
    </dependency>

We will also need to add the following JUnit 5 dependency for testing

    <dependency>
        <groupId>org.junit.jupiter</groupId>
        <artifactId>junit-jupiter</artifactId>
        <version>5.10.0</version>
        <scope>test</scope>
    </dependency>

Here’s an example of using Stream API with jdbc-stream .

import com.pivovarit.stream.StreamSupport;

import java.sql.ResultSet;
import java.util.stream.Stream;

public class JdbcStreamExample {
    public static Stream<Stock> getStockStream(ResultSet resultSet) {
        return StreamSupport.stream(
            resultSet,
            rs -> new Stock(
                rs.getString("symbol"),
                rs.getDouble("price"),
                rs.getInt("volume")
            )
        );
    }
}
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.ResultSet;
import java.sql.Statement;
import java.util.List;
import java.util.stream.Collectors;

import static org.junit.jupiter.api.Assertions.assertEquals;

class JdbcStreamExampleTest {

    private Connection connection;
    private Statement statement;

    @BeforeEach
    void setUp() throws Exception {
        // Set up an in-memory H2 database
        connection = DriverManager.getConnection("jdbc:h2:mem:testdb", "sa", "");
        statement = connection.createStatement();

        // Create the stocks table and insert test data
        statement.execute("CREATE TABLE stocks (symbol VARCHAR(10), price DOUBLE, volume INT)");
        statement.execute("INSERT INTO stocks VALUES ('AAPL', 150.0, 1000), ('GOOGL', 95.0, 800), ('MSFT', 210.0, 1500)");
    }

    @AfterEach
    void tearDown() throws Exception {
        // Clean up resources
        if (statement != null && !statement.isClosed()) {
            statement.close();
        }
        if (connection != null && !connection.isClosed()) {
            connection.close();
        }
    }

    @Test
    void testGetStockStreamFiltersAndMapsCorrectly() throws Exception {
        try (ResultSet resultSet = statement.executeQuery("SELECT symbol, price, volume FROM stocks")) {
            // Get the stock stream and filter stocks with price > 100
            List<Stock> filteredStocks = JdbcStreamExample.getStockStream(resultSet)
                                                          .filter(stock -> stock.price() > 100)
                                                          .collect(Collectors.toList());

            // Validate the results
            assertEquals(2, filteredStocks.size());
            assertEquals(new Stock("AAPL", 150.0, 1000), filteredStocks.get(0));
            assertEquals(new Stock("MSFT", 210.0, 1500), filteredStocks.get(1));
        }
    }
}

This library simplifies the process, making it easy to turn a ResultSet into a Stream with minimal boilerplate code.

5. Close Resources

Properly closing resources such as Connection, Statement, and ResultSet is crucial when working with JDBC to avoid resource leaks, which can lead to performance degradation and database connection issues. Java’s try-with-resources statement simplifies this by automatically managing the lifecycle of AutoCloseable resources.

Best Practices for Closing Resources

Streams created from ResultSet (e.g., using Spliterator or libraries like jdbc-stream) must ensure the ResultSet and associated resources are closed when the stream is no longer needed.

  1. Always Use Try-With-Resources: Resources such as Connection, Statement, and ResultSet should be declared within a try-with-resources block to ensure they are closed automatically when the block exits.
  2. Exception Handling: Exceptions during resource closing are suppressed if another exception occurs in the try block. These suppressed exceptions can still be accessed via Throwable.getSuppressed().

In all previous examples, AutoCloseable JDBC resources (Connection, Statement, ResultSet) are closed automatically when the block exits, whether successfully or due to an exception.

6. Conclusion

Processing JDBC ResultSet with the Stream API in Java can significantly improve the readability and functionality of data processing code. We covered several approaches:

  • Using custom Spliterator to transform ResultSet into a stream.
  • Leveraging JOOQ’s built-in streaming support for a seamless, high-level approach.
  • Using jdbc-stream to simplify the process with a lightweight library.

Each method offers distinct advantages, so the best choice depends on your specific requirements. Whichever approach you choose, remember to manage your resources effectively, utilizing try-with-resources to avoid memory leaks.

Ashraf Sarhan

With over 8 years of experience in the field, I have developed and maintained large-scale distributed applications for various domains, including library, audio books, and quant trading. I am passionate about OpenSource, CNCF/DevOps, Microservices, and BigData, and I constantly seek to learn new technologies and tools. I hold two Oracle certifications in Java programming and business component development.
Subscribe
Notify of
guest

This site uses Akismet to reduce spam. Learn how your comment data is processed.

0 Comments
Oldest
Newest Most Voted
Inline Feedbacks
View all comments
Back to top button