ResultSet as Stream in Java

 Java ResultSet as Stream response

 

In this article I have tried to describe ResultSet as Stream and Rest API streaming response using java. Here I am using java9 to implement the solution. Let's start step by step implementation.

 

First let us discuss requirements or scenarios.

1. You need to provide stream response to the client, so that client can process the data using stream.

2. You need to fetch data by applying filters/criteria, but data size is so large. Hence cannot be keep it into the memory. 

 

Following is tech stack I have used to solve the problem.

1.     SpringBoot

2.     Rest API

3.     H2 in-memory database

4.     Java 9

 

Step 1: Setup H2 database, add following properties in application.properties file.

spring.datasource.url=jdbc:h2:mem:test
spring.datasource.driverClassName=org.h2.Driver
spring.datasource.username=sa
spring.datasource.password=
spring.jpa.database-platform=org.hibernate.dialect.H2Dialect  
spring.h2.console.enabled=true

 

Step2: pom.xml

<properties>
        <java.version>1.9</java.version>
 </properties>

<dependencies>
        <dependency>
            <groupId>io.springfox</groupId>
            <artifactId>springfox-boot-starter</artifactId>
            <version>3.0.0</version>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-configuration-processor</artifactId>
            <optional>true</optional>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <!-- <optional>true</optional> -->
            <scope>provided</scope>
        </dependency>
        <dependency> 
            <groupId>com.h2database</groupId> 
            <artifactId>h2</artifactId> 
            <scope>runtime</scope> 
            <version>1.4.193</version>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-devtools</artifactId>
        </dependency> 
    </dependencies>

Step 3: Entity class for which you are going to fetch data, here I am using simple class.

public class User {
    
    String empid;
    String empname;

    //All argument constructor and No argument constructor

    // setter - getter

}

Step 3: ResultSetStreamInvocationHandler 

Making use of java reflection I am going to intercept method call on stream. i.e. if you are aware of filter chain in servlet then it's like a filter before actual servlet call. so method invoke will called before any method call.

package com.java9.streams.resultsetasstreams;

import java.lang.reflect.InvocationHandler;
import java.lang.reflect.Method;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.stream.Stream;

public class ResultSetStreamInvocationHandler<T> implements InvocationHandler{

    private Stream<T> stream;
    private PreparedStatement st;
    private ResultSet rs;
    
    public void setup(PreparedStatement st, Function<ResultSet, T> mappingFunction) throws SQLException{
        this.st = st;
        rs = st.executeQuery();
        stream = Stream.generate(new ResultSetSupplier(rs, mappingFunction));
    }

    @Override
    public Object invoke(Object proxy, Method method, Object[] args)
            throws Throwable {
       
        if (method == null)
            throw new RuntimeException("null method null");
       
        if (method.getName().equals("close") && args == null){
            if (st != null){
                st.close();
            }
        }
        return method.invoke(stream, args);//invokes the actual method call
    }
    
    private class ResultSetSupplier implements Supplier<T>{       
        private final ResultSet rs;
        private final Function<ResultSet, T> mappingFunction;

        private ResultSetSupplier(ResultSet rs, Function<ResultSet, T> mappingFunction) {
            this.rs = rs;
            this.mappingFunction = mappingFunction;
        }

        @Override
        public T get() {
            try {
                if (rs.next())
                    return mappingFunction.apply(rs);
            } catch (SQLException e) {
                e.printStackTrace();
            }
            return null;
        }
    }
    }

 

Here idea is very simple I am converting Supplier to stream, Supplier takes nothing but produces value type T. So, in our case supplier will supply data from ResultSet. mappingFunction converts data from resultset to our entity class i.e User in our case.

 

Step 4: ResultSetStream

This class is going to return ResultSet as stream.

package com.java9.streams.resultsetasstreams;

import java.lang.reflect.Proxy;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.function.Function;
import java.util.stream.Stream;

public class ResultSetStream<T>{

    @SuppressWarnings("unchecked")
    public Stream<T> getStream(PreparedStatement st, Function<ResultSet, T> mappingFunction) throws SQLException{
        final ResultSetStreamInvocationHandler<T> handler = new ResultSetStreamInvocationHandler<T>();
        handler.setup(st, mappingFunction);
        return (Stream<T>) Proxy.newProxyInstance(getClass().getClassLoader(),
                new Class<?>[] {Stream.class},
                handler);
    }
}

This class is very simple to understand it has only purpose is to return the stream to the caller method. We already talk about the interceptor method invoke on Stream class.

 

Step 5: Controller class i.e. UserController through which I am going to return stream response of the User entity from the database. Here I am not going to write separate Service layer, so everything is at controller level itself.

package com.java9.streams.resultsetasstreams;

import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.function.Function;
import java.util.stream.Stream;

import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;

import lombok.RequiredArgsConstructor;

@RestController
@RequiredArgsConstructor
public class UserController {

    @GetMapping("/getusers")
    public ResponseEntity<Stream<User>> getCurrentUser() {
        Stream<User> userSteream = fetchRecords();
       
        return ResponseEntity.ok()
                .body(userSteream.takeWhile(u -> u != null));
    }
    
    public Stream<User> fetchRecords() {
        ResultSetStream<User> resStream = new ResultSetStream<>();
        Connection con = getConnection();
       
        Function<ResultSet, User> userFunction = res -> { try {
            return new User(res.getString("empid"), res.getString("empname"));
        } catch (SQLException e) {
            e.printStackTrace();
        }
            return new User();
        };
        PreparedStatement st = null;
        try {
            st = con.prepareStatement("Select * from TBL_EMPLOYEES");
        } catch (SQLException e) {
            e.printStackTrace();
        }
        Stream<User> userSteream = Stream.empty();
        try {
            userSteream = resStream.getStream(st, userFunction);
           
            System.out.println("printing data");
        } catch (SQLException e) {
            e.printStackTrace();
        }
       
        System.out.println("Exiting");
        return userSteream;       
    }

 

Ø  userFunction : It takes a resulset as an argument and convert it to the Entity class i.e. User

Ø  userSteream.takeWhile(u -> u != null) : Since our stream is unbounded i.e. we don’t know when and how much data is going to flow from the database, to terminate the stream at some point we need takeWhile method so that we can exit the infinite stream. It will terminate the stream once it got null record from the get method of ResultSetSupplier class. takeWhile function on stream is introduced from Java 9 hence it  cannot be use in version java 8. Although you can implement your own version of takeWhile method and use it with java 8 stream.

 

For scenario #2  you need to add filter method on stream to fetch filtered data.

 

Github code link: https://github.com/kulbhushanchaskar/java-resultset-as-stream

 

Comments

Popular posts from this blog

Teiid - Simplifies Data Virtualization

spring boot security oauth2 implementation | spring boot security oauth2 example | spring boot security oauth2 practical approach