Enterprise Java

Reactive Web Applications Using SpringWebFlux

1. Introduction to Reactive Programming

Reactive programming is a term coined for the applications that has the following characteristics:

  • Non-blocking applications
  • Event-driven and asynchronous
  • Require a small number of threads to scale vertically (i.e. within the JVM)

Just like object-oriented programming, functional programming, or procedural programming, reactive programming is just another programming paradigm. It makes our program: Responsive, Resilient, Elastic.

2. Reactive Programming in Spring

The Spring Framework uses Reactor internally for its own reactive support. Reactor is an implementation of Reactive Streams (publishers, introduced in Java9). Reactor has the following two datatypes:

  • Flux (it is a Stream which can emit 0 or more elements)
  • Mono (it is a Stream which can emit 0 or 1 element)

Spring exposes these types from its API, thereby making the application reactive.

In Spring 5, a new module has been introduced called WebFlux, which gives the support for creating the reactive web applications using: HTTP(REST) and web sockets.

Spring Web Flux supports the following two models:

  • Functional Model
  • Annotation Model

In this article we will explore the Functional Model.

Following table compares normal Spring with Web Flux:

Traditional StackReactive Stack
Spring Web MVCSpring WebFlux
Controller and Handler MappingRouter Functions
Servlet APIHTTP/ Reactive Stream
Servlet ContainersAny servlet container with support for Servlet 3.1+, Tomcat 8.x,Jetty, Netty, UnderTow

3. Use Case

A REST API using Spring Web Flux has to be created for an Employee Management system that will expose the CRUD on Employee.

Note: DAO layer of the Project is hard coded.

4. Software and Environment needed

  • Java: 1.8 or above
  • Maven: 3.3.9 or above
  • Eclipse Luna or above
  • Spring Boot: 2.0.0.M4
  • Spring Boot Starter WebFlux
  • Postman for Testing the Application

5. Flow of the Application

Spring5 WebFlux’s Functional model is an alternative to using Spring MVC style annotations. In Spring WebFlux Functional model, Routers and handler functions are used to create a MVC application.HTTP request is routed via router function (alternative to annotations like @RequestMapping) and request is handled via handler function (alternative to @Controller handler methods).

Each handler function will take ServerRequest (org.springframework.web.reactive.function.server.ServerRequest) as a parameter and as result will return the Mono<ServerResponse> or Flux<ServerResponse> (org.springframework.web.reactive.function.server.ServerResponse).

6. Code For the Use Case and Description

pom.xml

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
    <modelVersion>4.0.0</modelVersion>
    <groupId>com.webflux</groupId>
    <artifactId>Demo_Spring_MVC_Web_Flux</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <repositories>
        <repository>
            <id>spring-snapshots</id>
            <name>Spring Snapshots</name>
            <url>https://repo.spring.io/snapshot</url>
            <snapshots>
                <enabled>true</enabled>
            </snapshots>
        </repository>
        <repository>
            <id>spring-milestones</id>
            <name>Spring Milestones</name>
            <url>https://repo.spring.io/milestone</url>
            <snapshots>
                <enabled>false</enabled>
            </snapshots>
        </repository>
    </repositories>
    <pluginRepositories>
        <pluginRepository>
            <id>spring-snapshots</id>
            <name>Spring Snapshots</name>
            <url>https://repo.spring.io/snapshot</url>
            <snapshots>
                <enabled>true</enabled>
            </snapshots>
        </pluginRepository>
        <pluginRepository>
            <id>spring-milestones</id>
            <name>Spring Milestones</name>
            <url>https://repo.spring.io/milestone</url>
            <snapshots>
                <enabled>false</enabled>
            </snapshots>
        </pluginRepository>
    </pluginRepositories>
    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.0.0.M4</version>
        <relativePath />
        <!-- lookup parent from repository -->
    </parent>
    <properties>
        <project.build.sourceEncoding>UTF-8
             </project.build.sourceEncoding>
        <project.reporting.outputEncoding>UTF-8
            </project.reporting.outputEncoding>
             <!-- Configuring Java 8 for the Project -->
        <java.version>1.8</java.version>
    </properties>
      <!--Excluding Embedded tomcat to make use of the Netty Server-->
    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
            <exclusions>
               <exclusion>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-tomcat</artifactId>
               </exclusion>
            </exclusions>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-webflux</artifactId>
        </dependency>
    </dependencies>
    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
            </plugin>
        </plugins>
    </build>
</project>

EmployeeDAO.java

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
package com.webflux.dao;
import java.util.LinkedHashMap;
import java.util.Map;
import org.springframework.stereotype.Repository;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import com.webflux.bussiness.bean.Employee;
@Repository
public class EmployeeDAO {
    /**
     * Map is used to Replace the Database
     * */
    static public Map mapOfEmloyeess = new LinkedHashMap();
    static int count=10004;
    static
    {
       mapOfEmloyeess.put(10001, new Employee("Jack",10001,12345.6,1001));
       mapOfEmloyeess.put(10002, new Employee("Justin",10002,12355.6,1002));
       mapOfEmloyeess.put(10003, new Employee("Eric",10003,12445.6,1003));
    }
 
    /**
     * Returns all the Existing Employees as Flux
     * */
    public Flux getAllEmployee(){
        return Flux.fromStream(mapOfEmloyeess.values().stream());      
    }
 
    /**Get Employee details using EmployeeId .
     * Returns a Mono response with Data if Employee is Found
     * Else returns a null
     * */
    public Mono getEmployeeDetailsById(int id){
         
        Monores = null;
        Employee emp =mapOfEmloyeess.get(id);
        if(emp!=null){
            res=Mono.just(emp);
        }
        return res;
    }
 
    /**Create Employee details.
     * Returns a Mono response with auto-generated Id
     * */
    public Mono addEmployee(Employee employee){
        count++;
        employee.setEmployeeId(count);
        mapOfEmloyeess.put(count, employee);
        return Mono.just(count);
    }
 
    /**Update the Employee details,
     * Receives the Employee Object and returns the updated Details
     * as Mono 
     * */
    public Mono updateEmployee (Employee employee){
        mapOfEmloyeess.put(employee.getEmployeeId(), employee);
        return Mono.just(employee);
    }
 
    /**Delete the Employee details,
     * Receives the EmployeeID and returns the deleted employee Details
     * as Mono 
     * */
    public Mono removeEmployee (int id){
        Employee emp= mapOfEmloyeess.remove(id);
        return Mono.just(emp);
    }  
}

It can be observed that all the methods of the EmployeeDAO are returning either Mono or Flux Response, thereby making the DAO Layer Reactive.

EmployeeHandler.java

001
002
003
004
005
006
007
008
009
010
011
012
013
014
015
016
017
018
019
020
021
022
023
024
025
026
027
028
029
030
031
032
033
034
035
036
037
038
039
040
041
042
043
044
045
046
047
048
049
050
051
052
053
054
055
056
057
058
059
060
061
062
063
064
065
066
067
068
069
070
071
072
073
074
075
076
077
078
079
080
081
082
083
084
085
086
087
088
089
090
091
092
093
094
095
096
097
098
099
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
package com.webflux.web.handler;
import static org.springframework.web.reactive.function.BodyInserters.fromObject;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.http.MediaType;
import org.springframework.stereotype.Controller;
import org.springframework.web.reactive.function.BodyExtractors;
import org.springframework.web.reactive.function.server.ServerRequest;
import org.springframework.web.reactive.function.server.ServerResponse;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import com.webflux.bussiness.bean.Employee;
import com.webflux.dao.EmployeeDAO;
@Controller
public class EmployeeHandler {
 
    @Autowired
    private EmployeeDAO employeeDAO;
     
     
    /**
     * Receives a ServerRequest.
     * Invokes the method getAllEmployee() from EmployeeDAO.
     * Prepares a Mono and returns the same.
     * */  
      public Mono getEmployeeDetails(ServerRequest request) {
        Flux  res=employeeDAO.getAllEmployee();    
        return ServerResponse.ok().contentType(MediaType.APPLICATION_JSON)
        .body(res,Employee.class);
    }
     
    /**
     * Receives a ServerRequest.
     * Extracts the Path Variable (named id) from the Request.
     * Invokes the method [getEmployeeDetailsById()] from EmployeeDAO.
     * Verifies if the object returned in the previous step is null
     * then returns a Bad request with appropriate message.
     * Else Returns the Mono with the Employee Data.
     * */
    public Mono getEmployeeDetailByEmployeeId(ServerRequest request) {
        //Extracts the Path Variable id from the Request
        int id =Integer.parseInt(request.pathVariable("id"));
        Mono employee = employeeDAO.getEmployeeDetailsById(id);
        Mono res= null;
        if(employee==null){
            res=ServerResponse.badRequest().body
                                    (fromObject("Please give a valid employee Id"));
        }
        else{
            //Converting Mono of Mono type to Mono
            res=employee.flatMap(x->ServerResponse.ok().body(fromObject(x)));
        }
        return res;
    }
 
    /**
     * Receives a ServerRequest.
     * Makes use of BodyExtractors and Extracts the Employee Data as
     * Mono from the ServerRequest.
     * Invokes the method [addEmployee()] of the EmployeeDAO.
     * Prepares a Mono and returns the same.
     * */
    public Mono addEmployee(ServerRequest request) {
        Mono requestBodyMono = request.body(BodyExtractors.toMono(Employee.class));
            Mono mono= employeeDAO.addEmployee(requestBodyMono.block());
        //Converting Mono of Mono type to Mono
        Mono res= mono.flatMap(x->ServerResponse.ok().body
                                    (fromObject("Employee Created with Id"+x)));
        return res;
    }
 
     
    /**
     * Receives a ServerRequest.
     * Makes use of BodyExtractors and Extracts the Employee Data as
     * Mono from the ServerRequest.
     * Finds the Employee and updates the details by invoking updateEmployee() of  
     * EmployeeDAO.
     * Prepares a Mono and returns the same.
     * */
    public Mono updateEmployee(ServerRequest request) {
        Mono requestBodyMono = request.body(BodyExtractors.toMono(Employee.class));
        Employee employee =  requestBodyMono.block();
          Mono employeeRet = employeeDAO.getEmployeeDetailsById(employee.getEmployeeId());
          Mono res= null;
        if(employeeRet==null){
            res=ServerResponse.badRequest().body(fromObject
                            ("Please Give valid employee details to update"));
        }
 
        else{
            Mono emp= employeeDAO.updateEmployee(employee);
             
            //Converting Mono of Mono type to Mono
            res=emp.flatMap(x->ServerResponse.ok().body(fromObject(x)));
        }
        return res;            
    }
 
    /**
     * Receives a ServerRequest.
     * Makes use of BodyExtractors and Extracts the Employee Data as
     * Mono from the ServerRequest.
     * Finds the Employee and deletes the details by invoking removeEmployee() of  
     * EmployeeDAO.
     * Prepares a Mono and returns the same.
     * */
    public Mono deleteEmployee(ServerRequest request) {
        int myId = Integer.parseInt(request.pathVariable("id"));
        Mono res= null;
        if (employeeDAO.getEmployeeDetailsById(myId) == null) {
            res=ServerResponse.badRequest().body
                      (fromObject("Please Give valid employee details to delete"));
        }else{
          Mono employee = employeeDAO.removeEmployee(myId);
         
              //Converting Mono of Mono type to Mono
          res=employee.flatMap(x->ServerResponse.ok().body(fromObject(x)));
           
        }
        return res;
    }
}

It can be observed that all the methods of the Handler are returning Mono<ServerResponse>, thereby making the Presentation Layer Reactive.

Note: Event Handler method should accept the ServerRequest and should return Mono<ServerResponse>

RouterConfiguration.java

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
package com.webflux.web.router.config;
import static org.springframework.web.reactive.function.server.RequestPredicates.DELETE;
import static org.springframework.web.reactive.function.server.RequestPredicates.GET;
import static org.springframework.web.reactive.function.server.RequestPredicates.POST;
import static org.springframework.web.reactive.function.server.RequestPredicates.PUT;
import static org.springframework.web.reactive.function.server.RequestPredicates.accept;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.http.MediaType;
import org.springframework.web.reactive.function.server.RouterFunction;
import org.springframework.web.reactive.function.server.RouterFunctions;
import org.springframework.web.reactive.function.server.ServerResponse;
import com.webflux.web.handler.EmployeeHandler;
@Configuration
/**
 * Router is configuration class.
 * It links the incoming requests with appropriate HTTP methods to the
 * respective method of the EmployeeHandler.
 * Method references are used for the mapping.
 * */
public class RouterConfiguration{
    @Autowired
    EmployeeHandler employeeHandler;
     
    @Bean
    public RouterFunction monoRouterFunction() {
     RouterFunctionrouterFunction=  RouterFunctions.       
              route(GET("/emp/controller/getDetails").
              and(accept(MediaType.APPLICATION_JSON)),         
              employeeHandler::getEmployeeDetails)
              
             .andRoute(GET("/emp/controller/getDetailsById/{id}")
             .and(accept(MediaType.APPLICATION_JSON)),           
              employeeHandler::getEmployeeDetailByEmployeeId)
     
             .andRoute(POST("/emp/controller/addEmp")
             .and(accept(MediaType.APPLICATION_JSON)),
              employeeHandler::addEmployee)
                         
             .andRoute(PUT("/emp/controller/updateEmp")
             .and(accept(MediaType.APPLICATION_JSON)),
             employeeHandler::updateEmployee)
                         
             .andRoute(DELETE("/emp/controller/deleteEmp/{id}")
             .and(accept(MediaType.APPLICATION_JSON)),
             employeeHandler::deleteEmployee);
        return routerFunction;
    }
   
}

ApplicationBootUp.java

01
02
03
04
05
06
07
08
09
10
package com.webflux;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@SpringBootApplication
public class ApplicationBootUp {
    public static void main(String[] args) {
        SpringApplication.run(ApplicationBootUp.class);
    }
 
}

Inside application.properties only server port is mentioned: server.port=8090.

Application can be deployed using the command:  clean install spring-boot:run and tested using the postman client.

7. References:

8. Download the Eclipse Project

Download
You can download the full source code of this example here: SpringWebFlux
Do you want to know how to develop your skillset to become a Java Rockstar?
Subscribe to our newsletter to start Rocking right now!
To get you started we give you our best selling eBooks for FREE!
1. JPA Mini Book
2. JVM Troubleshooting Guide
3. JUnit Tutorial for Unit Testing
4. Java Annotations Tutorial
5. Java Interview Questions
6. Spring Interview Questions
7. Android UI Design
and many more ....
I agree to the Terms and Privacy Policy

Mohpreet Singh

Mohpreet is Java developer and trainer. He is passionate in learning new technologies, having a detailed exposure on the related technology and publishing the related content.
Subscribe
Notify of
guest


This site uses Akismet to reduce spam. Learn how your comment data is processed.

0 Comments
Oldest
Newest Most Voted
Inline Feedbacks
View all comments
Back to top button