OO Functional Imperative Reactive weaved together with First Class Procedures
This is first of a two part article to discuss the “First Class Procedure”, a term I’m using to best describe the concept.
While Functional Programming provides composition through first class functions and Object Orientation provides composition through objects, first class procedures are to me a fusion of the two concepts. I find Object Orientation suffers issues of implicit object references (e.g. wanting a banana and getting the gorilla with the jungle), while functional programming (and OO) suffers issues of implicit threads. The first class procedure tackles these issues so that composition, state and execution are all separate concerns.
Now to write a complete description of the first class procedure is beyond a single article. There are many patterns used together to enable the composition through first class procedures. Therefore, I’m going to provide an introduction to first class procedures in two parts:
- This article to demonstrate with working code how flexible and easy composition is with first class procedures
- Next article to provide an explanation more closely aligned to the theory on how the first class procedure has evolved to its current understanding
We’ll start with some simple examples and then get to the more interesting weaving together of first class procedures.
First class procedure
Simple event loop
The following first class procedure services a REST request. This will be run on the HTTP socket event loop thread.
1 2 3 | public void service(ObjectResponse<ServicedThreadResponse> response) { response.send( new ServicedThreadResponse(Thread.currentThread().getName(), "Event" , System.currentTimeMillis())); } |
Simple thread-per-request
The following first class procedure services a REST request by pulling a value from the database and sending it in the response. This will be run by a separate thread pool.
1 2 3 4 5 | public void service(ServicedThreadRequest request, ThreadPerRequestRepository repository, ObjectResponse<ServicedThreadResponse> response) { int identifier = request.getIdentifier() % 10 ; ThreadPerRequest entity = repository.findById(identifier).get(); response.send( new ServicedThreadResponse(Thread.currentThread().getName(), entity.getName(), System.currentTimeMillis())); } |
The distinction of thread to use will be discussed later. However, for now notice that a Spring Repository is used by only the thread-per-request first class procedure.
First Class Procedures weaved together
Ok, the above is little boring. We’ve seen this in web application servers before. Show us something interesting!
To show something more interesting we are going to weave first class procedures together to achieve the following:
- Validate a request (on socket event loop thread).
- Start a transaction and register the request in the database. This will be on another thread to avoid halting the socket event loop thread.
- Make reactive calls to pull in data from other services.
- Run some functional code to work out the standard deviation on service times.
- Undertake alternate flows to handle special cases (including handling exceptions). Then if no exceptions causing rollback, store results in the database. This again is on a different thread to not tie up the reactive event loop thread.
- Send the response after committing the transaction
We’ll address each first class procedure in the order above.
Validate request (on socket event loop)
This is simple validation that the request is correct. As it is straight forward logic, we use the thread of the socket event loop. This way we don’t have to pay overheads of a thread context switch and threading overheads to reject invalid requests. The code is as follows:
01 02 03 04 05 06 07 08 09 10 11 12 13 14 | const HttpException = Java.type( "net.officefloor.server.http.HttpException" ); const Integer = Java.type( "java.lang.Integer" ) function validate(identifier, requestIdentifier) { if (Number(identifier) <= 0 ) { throw new HttpException( 422 , "Invalid identifier" ); } requestIdentifier.set(Integer.valueOf(identifier)) } validate.officefloor = [ { httpPathParameter: "identifier" }, { out: Integer }, { next : "valid" } ]; |
Note that the validation is written in JavaScript. This is so that the client side JavaScript validation rules can be re-used to validate requests to ensure consistency between client and server.
The officefloor attribute added to the function provides meta-data. This is necessary, as JavaScript does not provide the strongly typed information required of first class procedures.
Imperative to register request in database
After validation, the request identifier is registered in the database. This also creates a unique number for the request based on an IDENTITY column in the database.
1 2 3 4 5 6 | @Next ( "registered" ) public static void registerRequest( @Val int requestIdentifier, WeavedRequestRepository repository, Out<WeavedRequest> weavedRequest) { WeavedRequest entity = new WeavedRequest(requestIdentifier); repository.save(entity); weavedRequest.set(entity); } |
Reactive
The next is some Reactive code to concurrently call the two REST end points detailed at the start of this article (simple event loop and simple thread-per-request). Because we are using Reactive we can call them concurrently to improve performance.
Note that while waiting on the responses, the flow is effectively idle with threads servicing other functionality. This is asynchronous handling so that threads are not tied up waiting. Once both sets of results come back, they notify the respective asynchronous flow to continue processing.
01 02 03 04 05 06 07 08 09 10 11 12 13 14 15 16 17 18 19 | @Next ( "useData" ) public static void retrieveData(WebClient client, AsynchronousFlow eventLoopFlow, @EventLoopResponse Out<ServicedThreadResponse> eventLoopResponse, @Val WeavedRequest request, AsynchronousFlow threadPerRequestFlow, @ThreadPerRequestResponse Out<ServicedThreadResponse> threadPerRequestResponse) { Flux.range( 1 , 10 ) .map((index) -> client.get().uri(URL, "event-loop" ).retrieve().bodyToMono(ServicedThreadResponse. class )) .flatMap((response) -> response).collectList().subscribe((responses) -> eventLoopFlow.complete( () -> eventLoopResponse.set(responses.stream().toArray(ServicedThreadResponse[]:: new )))); Flux.range( 1 , 10 ) .map((index) -> client.post().uri(URL, "thread-per-request" ).contentType(MediaType.APPLICATION_JSON) .syncBody( new ServicedThreadRequest(request.getId())).retrieve() .bodyToMono(ServicedThreadResponse. class )) .flatMap((response) -> response).collectList().subscribe((responses) -> threadPerRequestFlow.complete( () -> threadPerRequestResponse.set(responses.stream().toArray(ServicedThreadResponse[]:: new )))); } |
By now you may be noticing the Out/ @Val combinations. This is how values can be passed from one first class procedure to another first class procedure. Note that if type for different values is the same, a qualifier can be used to distinguish them. The rest of the arguments are provided from dependency injection (in this case Spring).
Functional
Next the reactive responses are provided to Scala functional code to determine the standard deviation of service times.
01 02 03 04 05 06 07 08 09 10 11 12 | def mean(timestamps: Iterable[Long]): Double = timestamps.sum.toDouble / timestamps.size def variance(timestamps: Iterable[Long]): Double = { val avg = mean(timestamps) timestamps.map(timestamp => math.pow(timestamp.toDouble - avg, 2 )).sum / timestamps.size } def stdDev(timestamps: Iterable[Long]): Double = math.sqrt(variance(timestamps)) @Next ( "use" ) def standardDeviation( @EventLoopResponse @Val eventLoopResponses: Array[ServicedThreadResponse], @ThreadPerRequestResponse @Val threadPerRequestResponses: Array[ServicedThreadResponse]): Double = stdDev((eventLoopResponses ++ threadPerRequestResponses).map(response => response.getTimestamp)) |
Note that a library could be used to reduce this code. However, we’ve done this to demonstrate how functional code can be integrated into first class procedures.
Flow control
The next first class procedure triggers a flow to handle special cases. Should there be no issues with the special cases, then it stores the standard deviation in the database.
01 02 03 04 05 06 07 08 09 10 11 12 13 14 | @FlowInterface public static interface Flows { void handleSpecialCases(FlowSuccessful callback); void stored(); } public static void store( @Parameter double standardDeviation, Flows flows, @Val WeavedRequest request, WeavedRequestRepository repository, Out<RequestStandardDeviation> stDevOut) { flows.handleSpecialCases(() -> { request.setRequestStandardDeviation( new RequestStandardDeviation(standardDeviation, request)); repository.save(request); stDevOut.set(request.getRequestStandardDeviation()); flows.stored(); }); } |
The handling of the special cases is by the following first class procedure.
1 2 3 4 5 6 7 8 | public static void handleSpecialCase( @Val WeavedRequest request) throws WeavedRollbackException, WeavedCommitException { switch (request.getRequestIdentifier()) { case 3 : throw new WeavedRollbackException(request); case 4 : throw new WeavedCommitException(request); } } |
Touch of exception handling
The two exception handling first class procedures are as follows.
01 02 03 04 05 06 07 08 09 10 11 | public static void handle( @Parameter WeavedRollbackException exception, ObjectResponse<WeavedErrorResponse> response) { WeavedRequest request = exception.getWeavedRequest(); response.send( new WeavedErrorResponse(request.getRequestIdentifier(), request.getId())); } public static void handle( @Parameter WeavedCommitException exception, WeavedRequestRepository repository, ObjectResponse<WeavedErrorResponse> response) { WeavedRequest request = exception.getWeavedRequest(); request.setWeavedError( new WeavedError( "Request Identifier (" + request.getRequestIdentifier() + ") is special case" , request)); repository.save(request); response.send( new WeavedErrorResponse(request.getRequestIdentifier(), request.getId())); } |
The second handler works within the transaction, so includes further data stored in the database.
Note that due to first class procedure composition not requiring the caller to catch exceptions, checked exceptions are embraced. We consider checked exceptions very useful information in flow composition. However, the distinction is that it should not be the caller’s concern but rather the flow’s concern. To me this is a big difference and stops the catch and log exception handling problem. Exception handling is now a separate concern that can be coded in afterwards.
Successful response
On successful storage of the request details in the database, the following first class procedure sends the response.
1 2 3 4 | public void send( @Val WeavedRequest request, @Val RequestStandardDeviation standardDeviation, @EventLoopResponse @Val ServicedThreadResponse[] eventLoopResponse, @ThreadPerRequestResponse @Val ServicedThreadResponse[] threadPerRequestResponse, ObjectResponse<WeavedResponse> response) { response.send( new WeavedResponse(request.getRequestIdentifier(), request.getId(), eventLoopResponse, threadPerRequestResponse, standardDeviation.getStandardDeviation())); } |
Kotlin for some OO
Oh, and just for a little bit more polyglot fun, the OO objects used to represent the JSON request/responses are the following.
01 02 03 04 05 06 07 08 09 10 11 12 | @HttpObject data class ServicedThreadRequest(val identifier: Int) data class ServicedThreadResponse(val threadName: String, val lookupName: String, val timestamp: Long) data class WeavedErrorResponse(val requestIdentifier: Int, val requestNumber: Int) data class WeavedResponse(val requestIdentifier: Int , val requestNumber: Int , val eventLoopResponses: Array , val threadPerRequestResponses: Array , val standardDeviation: Double) |
Proving it works
The following is a test to confirm the flow of first class procedures services the request.
01 02 03 04 05 06 07 08 09 10 11 12 13 14 15 16 17 18 19 20 21 22 23 | public static final SpringRule spring = new SpringRule(); public static final OfficeFloorRule officeFloor = new OfficeFloorRule(); @ClassRule public static final RuleChain ordered = RuleChain.outerRule(spring).around(officeFloor); @Rule public final HttpClientRule client = new HttpClientRule(); private static final ObjectMapper mapper = new ObjectMapper(); static { mapper.registerModule( new KotlinModule()); } @Test public void confirmWeavedTogether() throws Exception { HttpResponse response = this .client.execute( new HttpPost( this .client.url( "/weave/1" ))); assertEquals( "Should be successful" , 200 , response.getStatusLine().getStatusCode()); WeavedResponse body = mapper.readValue(EntityUtils.toString(response.getEntity()), WeavedResponse. class ); WeavedRequest entity = spring.getBean(WeavedRequestRepository. class ).findById(body.getRequestNumber()).get(); assertNotNull( "Should have standard deviation stored" , entity.getRequestStandardDeviation()); } |
Weaving together
The following diagram is the configuration to weave the above first class procedures together.
This is the only configuration/code necessary to compose the first class procedures together. Notice the names represent the first class procedure names and their respective meta-data.
What this means, is check the port on the all the calls and tests. Yes, everything you see above is running off the one port. Yep, you don’t have to choose between a framework that provides only thread-per-request or single threaded event loops. This is because of the execution strategy provided by Thread Injection of first class procedures.
Thread Injection
The threading configuration is actually the following:
1 2 3 | <teams> <team source= "net.officefloor.frame.impl.spi.team.ExecutorCachedTeamSource" type= "org.springframework.data.repository.CrudRepository" /> </teams> |
Here we flag all procedures requiring a Spring Repository to be executed by a thread pool. Remember I said keep note of use of Spring Repository. Well the above configuration has any first class procedure requiring a Spring Repository executed by the configured thread pool. Note that thread pools are named teams, due to the modeling origins of first class procedures coming from Offices.
Therefore, looking at the flow again, the thread execution is as follows:
- Validate uses the thread of the socket listener event loop
- Register request uses a Spring Repository, so execution is swapped to a thread from the configured thread pool
- This thread carries onto trigger the asynchronous reactive calls
- The reactive event loop thread then invokes the callbacks. As the Scala code is quick to execute, the reactive event loop thread carries on to execute the Scala pure function. Here it is deemed that a thread context switch is too much overhead, and it is more efficient to just invoke the highly optimised Scala pure function. However, if we want to separate the Scala function to different thread pool, we can configure in a different thread pool (typically via marker dependency on the first class procedure).
- The remaining imperative code has a switch back to a thread from the configured thread pool, as depends on Spring repository. Furthermore, the thread locals between the threads are propagated to each used thread, so the Spring Repository transaction is not lost (i.e. transaction is active for all first class procedures within the transaction bounds).
- Response is then sent.
Now all the above is configurable viaThread Injection. If we have, for example, more than one synchronous data store, we can create a thread pool to interact with each data store to avoid one slow data store tying up all threads of the application.
This also means you can configure different threading for different environments without having to change any code.
Disclaimer
In a real world applications, I would try to avoid so many of the above programming languages together. I’d try to streamline them to just a couple to avoid too many skill sets involved driving up maintenance costs of your application (plus reduces problems for mixed compiling). This is only a demonstration of how OO, Functional, Imperative and Reactive code can all be weaved together with first class procedures. Furthermore, it demonstrates how you can write concrete solutions before abstracting.
Also, as you can see we’ve had to cover a lot of breadth in each programming paradigm. If the code is not a good representation of the paradigm, we’re very happy to take feedback on improvements from those more acquainted with a particular paradigm.
And if we’ve missed an important paradigm, please let me know so we can consider including it. When it comes to coding we appreciate diversity to give developers choice. We’re trying to tear down fences between the paradigms to have one big happy coding family.
Summary
We’ve demonstrated how the first class procedure can weave together polyglot code written in different paradigms to service a request. The code outlined above in the article is all the code required for the application. There is no further weaving code required.
Furthermore, to avoid the problems of it only works on my machine (in this article), the code for the above is available here. See thereadme on how to run it.
For more understanding of what’s going on, see the tutorials, my other articles and in particular mynext article.
Published on Java Code Geeks with permission by Daniel Sagenschneider, partner at our JCG program. See the original article here: OO Functional Imperative Reactive weaved together with First Class Procedures Opinions expressed by Java Code Geeks contributors are their own. |