Reactive Web Applications Using SpringWebFlux
1. Introduction to Reactive Programming
Reactive programming is a term coined for the applications that has the following characteristics:
- Non-blocking applications
- Event-driven and asynchronous
- Require a small number of threads to scale vertically (i.e. within the JVM)
Just like object-oriented programming, functional programming, or procedural programming, reactive programming is just another programming paradigm. It makes our program: Responsive, Resilient, Elastic.
2. Reactive Programming in Spring
The Spring Framework uses Reactor internally for its own reactive support. Reactor is an implementation of Reactive Streams (publishers, introduced in Java9). Reactor has the following two datatypes:
- Flux (it is a Stream which can emit 0 or more elements)
- Mono (it is a Stream which can emit 0 or 1 element)
Spring exposes these types from its API, thereby making the application reactive.
In Spring 5, a new module has been introduced called WebFlux, which gives the support for creating the reactive web applications using: HTTP(REST) and web sockets.
Spring Web Flux supports the following two models:
- Functional Model
- Annotation Model
In this article we will explore the Functional Model.
Following table compares normal Spring with Web Flux:
Traditional Stack | Reactive Stack |
Spring Web MVC | Spring WebFlux |
Controller and Handler Mapping | Router Functions |
Servlet API | HTTP/ Reactive Stream |
Servlet Containers | Any servlet container with support for Servlet 3.1+, Tomcat 8.x,Jetty, Netty, UnderTow |
3. Use Case
A REST API using Spring Web Flux has to be created for an Employee Management system that will expose the CRUD on Employee.
Note: DAO layer of the Project is hard coded.
4. Software and Environment needed
- Java: 1.8 or above
- Maven: 3.3.9 or above
- Eclipse Luna or above
- Spring Boot: 2.0.0.M4
- Spring Boot Starter WebFlux
- Postman for Testing the Application
5. Flow of the Application
Spring5 WebFlux’s Functional model is an alternative to using Spring MVC style annotations. In Spring WebFlux Functional model, Routers and handler functions are used to create a MVC application.HTTP request is routed via router function (alternative to annotations like @RequestMapping
) and request is handled via handler function (alternative to @Controller
handler methods).
Each handler function will take ServerRequest (org.springframework.web.reactive.function.server.ServerRequest
) as a parameter and as result will return the Mono<ServerResponse>
or Flux<ServerResponse>
(org.springframework.web.reactive.function.server.ServerResponse
).
6. Code For the Use Case and Description
pom.xml
01 02 03 04 05 06 07 08 09 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 | < project xmlns = "http://maven.apache.org/POM/4.0.0" xmlns:xsi = "http://www.w3.org/2001/XMLSchema-instance" < modelVersion >4.0.0</ modelVersion > < groupId >com.webflux</ groupId > < artifactId >Demo_Spring_MVC_Web_Flux</ artifactId > < version >0.0.1-SNAPSHOT</ version > < repositories > < repository > < id >spring-snapshots</ id > < name >Spring Snapshots</ name > < snapshots > < enabled >true</ enabled > </ snapshots > </ repository > < repository > < id >spring-milestones</ id > < name >Spring Milestones</ name > < snapshots > < enabled >false</ enabled > </ snapshots > </ repository > </ repositories > < pluginRepositories > < pluginRepository > < id >spring-snapshots</ id > < name >Spring Snapshots</ name > < snapshots > < enabled >true</ enabled > </ snapshots > </ pluginRepository > < pluginRepository > < id >spring-milestones</ id > < name >Spring Milestones</ name > < snapshots > < enabled >false</ enabled > </ snapshots > </ pluginRepository > </ pluginRepositories > < parent > < groupId >org.springframework.boot</ groupId > < artifactId >spring-boot-starter-parent</ artifactId > < version >2.0.0.M4</ version > < relativePath /> <!-- lookup parent from repository --> </ parent > < properties > < project.build.sourceEncoding >UTF-8 </ project.build.sourceEncoding > < project.reporting.outputEncoding >UTF-8 </ project.reporting.outputEncoding > <!-- Configuring Java 8 for the Project --> < java.version >1.8</ java.version > </ properties > <!--Excluding Embedded tomcat to make use of the Netty Server--> < dependencies > < dependency > < groupId >org.springframework.boot</ groupId > < artifactId >spring-boot-starter-web</ artifactId > < exclusions > < exclusion > < groupId >org.springframework.boot</ groupId > < artifactId >spring-boot-starter-tomcat</ artifactId > </ exclusion > </ exclusions > </ dependency > < dependency > < groupId >org.springframework.boot</ groupId > < artifactId >spring-boot-starter-webflux</ artifactId > </ dependency > </ dependencies > < build > < plugins > < plugin > < groupId >org.springframework.boot</ groupId > < artifactId >spring-boot-maven-plugin</ artifactId > </ plugin > </ plugins > </ build > </ project > |
EmployeeDAO.java
01 02 03 04 05 06 07 08 09 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 | package com.webflux.dao; import java.util.LinkedHashMap; import java.util.Map; import org.springframework.stereotype.Repository; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; import com.webflux.bussiness.bean.Employee; @Repository public class EmployeeDAO { /** * Map is used to Replace the Database * */ static public Map mapOfEmloyeess = new LinkedHashMap(); static int count= 10004 ; static { mapOfEmloyeess.put( 10001 , new Employee( "Jack" , 10001 , 12345.6 , 1001 )); mapOfEmloyeess.put( 10002 , new Employee( "Justin" , 10002 , 12355.6 , 1002 )); mapOfEmloyeess.put( 10003 , new Employee( "Eric" , 10003 , 12445.6 , 1003 )); } /** * Returns all the Existing Employees as Flux * */ public Flux getAllEmployee(){ return Flux.fromStream(mapOfEmloyeess.values().stream()); } /**Get Employee details using EmployeeId . * Returns a Mono response with Data if Employee is Found * Else returns a null * */ public Mono getEmployeeDetailsById( int id){ Monores = null ; Employee emp =mapOfEmloyeess.get(id); if (emp!= null ){ res=Mono.just(emp); } return res; } /**Create Employee details. * Returns a Mono response with auto-generated Id * */ public Mono addEmployee(Employee employee){ count++; employee.setEmployeeId(count); mapOfEmloyeess.put(count, employee); return Mono.just(count); } /**Update the Employee details, * Receives the Employee Object and returns the updated Details * as Mono * */ public Mono updateEmployee (Employee employee){ mapOfEmloyeess.put(employee.getEmployeeId(), employee); return Mono.just(employee); } /**Delete the Employee details, * Receives the EmployeeID and returns the deleted employee Details * as Mono * */ public Mono removeEmployee ( int id){ Employee emp= mapOfEmloyeess.remove(id); return Mono.just(emp); } } |
It can be observed that all the methods of the EmployeeDAO
are returning either Mono or Flux Response, thereby making the DAO Layer Reactive.
EmployeeHandler.java
001 002 003 004 005 006 007 008 009 010 011 012 013 014 015 016 017 018 019 020 021 022 023 024 025 026 027 028 029 030 031 032 033 034 035 036 037 038 039 040 041 042 043 044 045 046 047 048 049 050 051 052 053 054 055 056 057 058 059 060 061 062 063 064 065 066 067 068 069 070 071 072 073 074 075 076 077 078 079 080 081 082 083 084 085 086 087 088 089 090 091 092 093 094 095 096 097 098 099 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 | package com.webflux.web.handler; import static org.springframework.web.reactive.function.BodyInserters.fromObject; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.http.MediaType; import org.springframework.stereotype.Controller; import org.springframework.web.reactive.function.BodyExtractors; import org.springframework.web.reactive.function.server.ServerRequest; import org.springframework.web.reactive.function.server.ServerResponse; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; import com.webflux.bussiness.bean.Employee; import com.webflux.dao.EmployeeDAO; @Controller public class EmployeeHandler { @Autowired private EmployeeDAO employeeDAO; /** * Receives a ServerRequest. * Invokes the method getAllEmployee() from EmployeeDAO. * Prepares a Mono and returns the same. * */ public Mono getEmployeeDetails(ServerRequest request) { Flux res=employeeDAO.getAllEmployee(); return ServerResponse.ok().contentType(MediaType.APPLICATION_JSON) .body(res,Employee. class ); } /** * Receives a ServerRequest. * Extracts the Path Variable (named id) from the Request. * Invokes the method [getEmployeeDetailsById()] from EmployeeDAO. * Verifies if the object returned in the previous step is null * then returns a Bad request with appropriate message. * Else Returns the Mono with the Employee Data. * */ public Mono getEmployeeDetailByEmployeeId(ServerRequest request) { //Extracts the Path Variable id from the Request int id =Integer.parseInt(request.pathVariable( "id" )); Mono employee = employeeDAO.getEmployeeDetailsById(id); Mono res= null ; if (employee== null ){ res=ServerResponse.badRequest().body (fromObject( "Please give a valid employee Id" )); } else { //Converting Mono of Mono type to Mono res=employee.flatMap(x->ServerResponse.ok().body(fromObject(x))); } return res; } /** * Receives a ServerRequest. * Makes use of BodyExtractors and Extracts the Employee Data as * Mono from the ServerRequest. * Invokes the method [addEmployee()] of the EmployeeDAO. * Prepares a Mono and returns the same. * */ public Mono addEmployee(ServerRequest request) { Mono requestBodyMono = request.body(BodyExtractors.toMono(Employee. class )); Mono mono= employeeDAO.addEmployee(requestBodyMono.block()); //Converting Mono of Mono type to Mono Mono res= mono.flatMap(x->ServerResponse.ok().body (fromObject( "Employee Created with Id" +x))); return res; } /** * Receives a ServerRequest. * Makes use of BodyExtractors and Extracts the Employee Data as * Mono from the ServerRequest. * Finds the Employee and updates the details by invoking updateEmployee() of * EmployeeDAO. * Prepares a Mono and returns the same. * */ public Mono updateEmployee(ServerRequest request) { Mono requestBodyMono = request.body(BodyExtractors.toMono(Employee. class )); Employee employee = requestBodyMono.block(); Mono employeeRet = employeeDAO.getEmployeeDetailsById(employee.getEmployeeId()); Mono res= null ; if (employeeRet== null ){ res=ServerResponse.badRequest().body(fromObject ( "Please Give valid employee details to update" )); } else { Mono emp= employeeDAO.updateEmployee(employee); //Converting Mono of Mono type to Mono res=emp.flatMap(x->ServerResponse.ok().body(fromObject(x))); } return res; } /** * Receives a ServerRequest. * Makes use of BodyExtractors and Extracts the Employee Data as * Mono from the ServerRequest. * Finds the Employee and deletes the details by invoking removeEmployee() of * EmployeeDAO. * Prepares a Mono and returns the same. * */ public Mono deleteEmployee(ServerRequest request) { int myId = Integer.parseInt(request.pathVariable( "id" )); Mono res= null ; if (employeeDAO.getEmployeeDetailsById(myId) == null ) { res=ServerResponse.badRequest().body (fromObject( "Please Give valid employee details to delete" )); } else { Mono employee = employeeDAO.removeEmployee(myId); //Converting Mono of Mono type to Mono res=employee.flatMap(x->ServerResponse.ok().body(fromObject(x))); } return res; } } |
It can be observed that all the methods of the Handler are returning Mono<ServerResponse>
, thereby making the Presentation Layer Reactive.
Note: Event Handler method should accept the ServerRequest and should return Mono<ServerResponse>
RouterConfiguration.java
01 02 03 04 05 06 07 08 09 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 | package com.webflux.web.router.config; import static org.springframework.web.reactive.function.server.RequestPredicates.DELETE; import static org.springframework.web.reactive.function.server.RequestPredicates.GET; import static org.springframework.web.reactive.function.server.RequestPredicates.POST; import static org.springframework.web.reactive.function.server.RequestPredicates.PUT; import static org.springframework.web.reactive.function.server.RequestPredicates.accept; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.http.MediaType; import org.springframework.web.reactive.function.server.RouterFunction; import org.springframework.web.reactive.function.server.RouterFunctions; import org.springframework.web.reactive.function.server.ServerResponse; import com.webflux.web.handler.EmployeeHandler; @Configuration /** * Router is configuration class. * It links the incoming requests with appropriate HTTP methods to the * respective method of the EmployeeHandler. * Method references are used for the mapping. * */ public class RouterConfiguration{ @Autowired EmployeeHandler employeeHandler; @Bean public RouterFunction monoRouterFunction() { RouterFunctionrouterFunction= RouterFunctions. route(GET( "/emp/controller/getDetails" ). and(accept(MediaType.APPLICATION_JSON)), employeeHandler::getEmployeeDetails) .andRoute(GET( "/emp/controller/getDetailsById/{id}" ) .and(accept(MediaType.APPLICATION_JSON)), employeeHandler::getEmployeeDetailByEmployeeId) .andRoute(POST( "/emp/controller/addEmp" ) .and(accept(MediaType.APPLICATION_JSON)), employeeHandler::addEmployee) .andRoute(PUT( "/emp/controller/updateEmp" ) .and(accept(MediaType.APPLICATION_JSON)), employeeHandler::updateEmployee) .andRoute(DELETE( "/emp/controller/deleteEmp/{id}" ) .and(accept(MediaType.APPLICATION_JSON)), employeeHandler::deleteEmployee); return routerFunction; } } |
ApplicationBootUp.java
01 02 03 04 05 06 07 08 09 10 | package com.webflux; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; @SpringBootApplication public class ApplicationBootUp { public static void main(String[] args) { SpringApplication.run(ApplicationBootUp. class ); } } |
Inside application.properties only server port is mentioned: server.port=8090
.
Application can be deployed using the command: clean install spring-boot:run
and tested using the postman client
.
7. References:
- https://docs.spring.io/spring/docs/5.0.0.BUILD-SNAPSHOT/spring-framework-reference/html/web-reactive.html
- http://www.baeldung.com/reactor-core
- http://www.baeldung.com/spring-5-functional-web
8. Download the Eclipse Project
You can download the full source code of this example here: SpringWebFlux