Using rx-java Observable in a Spring MVC flow
Spring MVC has supported asynchronous request processing flow for sometime now and this support internally utilizes the Servlet 3 async support of containers like Tomcat/Jetty.
Spring Web Async support
Consider a service call that takes a little while to process, simulated with a delay:
public CompletableFuture<Message> getAMessageFuture() { return CompletableFuture.supplyAsync(() -> { logger.info("Start: Executing slow task in Service 1"); Util.delay(1000); logger.info("End: Executing slow task in Service 1"); return new Message("data 1"); }, futureExecutor); }
If I were to call this service in a user request flow, the traditional blocking controller flow would look like this:
@RequestMapping("/getAMessageFutureBlocking") public Message getAMessageFutureBlocking() throws Exception { return service1.getAMessageFuture().get(); }
A better approach is to use the Spring Asynchronous support to return the result back to the user when available from the CompletableFuture, this way not holding up the containers thread:
@RequestMapping("/getAMessageFutureAsync") public DeferredResult<Message> getAMessageFutureAsync() { DeferredResult<Message> deffered = new DeferredResult<>(90000); CompletableFuture<Message> f = this.service1.getAMessageFuture(); f.whenComplete((res, ex) -> { if (ex != null) { deffered.setErrorResult(ex); } else { deffered.setResult(res); } }); return deffered; }
Using Observable in a Async Flow
Now to the topic of this article, I have been using Rx-java’s excellent Observable type as my service return types lately and wanted to ensure that the web layer also remains asynchronous in processing the Observable type returned from a service call.
Consider the service that was described above now modified to return an Observable:
public Observable<Message> getAMessageObs() { return Observable.<Message>create(s -> { logger.info("Start: Executing slow task in Service 1"); Util.delay(1000); s.onNext(new Message("data 1")); logger.info("End: Executing slow task in Service 1"); s.onCompleted(); }).subscribeOn(Schedulers.from(customObservableExecutor)); }
I can nullify all the benefits of returning an Observable by ending up with a blocking call at the web layer, a naive call will be the following:
@RequestMapping("/getAMessageObsBlocking") public Message getAMessageObsBlocking() { return service1.getAMessageObs().toBlocking().first(); }
To make this flow async through the web layer, a better way to handle this call is the following, essentially by transforming Observable to Spring’s DeferredResult type:
@RequestMapping("/getAMessageObsAsync") public DeferredResult<Message> getAMessageAsync() { Observable<Message> o = this.service1.getAMessageObs(); DeferredResult<Message> deffered = new DeferredResult<>(90000); o.subscribe(m -> deffered.setResult(m), e -> deffered.setErrorResult(e)); return deffered; }
This would ensure that the thread handling the user flow would return as soon as the service call is complete and the user response will be processed reactively once the observable starts emitting values.
If you are interested in exploring this further, here is a github repo with working samples: https://github.com/bijukunjummen/spring-web-observable.
References:
Spring’s reference guide on async flows in the web tier: http://docs.spring.io/spring/docs/current/spring-framework-reference/html/mvc.html#mvc-ann-async
More details on Spring DeferredResult by the inimitable Tomasz Nurkiewicz at the NoBlogDefFound blog – http://www.nurkiewicz.com/2013/03/deferredresult-asynchronous-processing.html
Reference: | Using rx-java Observable in a Spring MVC flow from our JCG partner Biju Kunjummen at the all and sundry blog. |