Functional Hystrix using Spring Cloud HystrixCommands
Spring’s WebClient provides a non-blocking client for making service to service calls.Hystrix, though now in a maintenance mode, has been used for protecting service to service calls by preventing cascading failures, providing circuit breakers for calls to slow or faulty upstream services.
In this post, I will be exploring how Spring Cloud provides a newer functional approach to wrapping a remote call with Hystrix.
Consider a simple service that returns a list of entities, say a list of cities, modeled using the excellentWiremock tool:
1 2 3 4 5 6 | WIREMOCK_SERVER.stubFor(WireMock.get(WireMock.urlMatching( "/cities" )) .withHeader( "Accept" , WireMock.equalTo( "application/json" )) .willReturn(WireMock.aResponse() .withStatus(HttpStatus.OK.value()) .withFixedDelay( 5000 ) .withHeader( "Content-Type" , "application/json" ))) |
When called with a uri of the type “/cities” this Wiremock endpoint responds with a json of the following type:
01 02 03 04 05 06 07 08 09 10 11 12 13 14 15 16 17 18 19 20 | [ { "country" : "USA" , "id" : 1 , "name" : "Portland" , "pop" : 1600000 }, { "country" : "USA" , "id" : 2 , "name" : "Seattle" , "pop" : 3200000 }, { "country" : "USA" , "id" : 3 , "name" : "SFO" , "pop" : 6400000 } ] |
after a delay of 5 seconds.
Traditional approach
There are many approaches to using Hystrix, I have traditionally preferred an approach where an explicit Hystrix Command protects the remote call, along these lines:
01 02 03 04 05 06 07 08 09 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 | import com.netflix.hystrix.HystrixCommandGroupKey import com.netflix.hystrix.HystrixCommandKey import com.netflix.hystrix.HystrixCommandProperties import com.netflix.hystrix.HystrixObservableCommand import org.bk.samples.model.City import org.slf4j.Logger import org.slf4j.LoggerFactory import org.springframework.http.MediaType import org.springframework.web.reactive.function.client.WebClient import org.springframework.web.reactive.function.client.bodyToFlux import org.springframework.web.util.UriComponentsBuilder import reactor.core.publisher.Flux import rx.Observable import rx.RxReactiveStreams import rx.schedulers.Schedulers import java.net.URI class CitiesHystrixCommand( private val webClientBuilder: WebClient.Builder, private val citiesBaseUrl: String ) : HystrixObservableCommand<City>( HystrixObservableCommand.Setter .withGroupKey(HystrixCommandGroupKey.Factory.asKey( "cities-service" )) .andCommandKey(HystrixCommandKey.Factory.asKey( "cities-service" )) .andCommandPropertiesDefaults(HystrixCommandProperties.Setter() .withExecutionTimeoutInMilliseconds( 4000 ))) { override fun construct(): Observable<City> { val buildUri: URI = UriComponentsBuilder .fromUriString(citiesBaseUrl) .path( "/cities" ) .build() .encode() .toUri() val webClient: WebClient = this .webClientBuilder.build() val result: Flux<City> = webClient.get() .uri(buildUri) .accept(MediaType.APPLICATION_JSON) .exchange() .flatMapMany { clientResponse -> clientResponse.bodyToFlux<City>() } return RxReactiveStreams.toObservable(result) } override fun resumeWithFallback(): Observable<City> { LOGGER.error( "Falling back on cities call" , executionException) return Observable.empty() } companion object { private val LOGGER: Logger = LoggerFactory.getLogger(CitiesHystrixCommand:: class .java) } } |
This code can now be used to make a remote call the following way:
01 02 03 04 05 06 07 08 09 10 11 12 13 14 15 16 17 18 | import org.springframework.http.MediaType import org.springframework.web.reactive.function.client.WebClient class CitiesHystrixCommandBasedClient( private val webClientBuilder: WebClient.Builder, private val citiesBaseUrl: String ) { fun getCities(): Flux<City> { val citiesObservable: Observable<City> = CitiesHystrixCommand(webClientBuilder, citiesBaseUrl) .observe() .subscribeOn(Schedulers.io()) return Flux .from(RxReactiveStreams .toPublisher(citiesObservable)) } } |
Two things to note here,
1.WebClient returns a Project Reactor “Flux” type representing a list of cities, however Hystrix is Rx-Java 1 based, so Flux is being transformed to Rx-Java Observable using “RxReactiveStreams.toObservable()” call,provided by the RxJavaReactiveStreams library here.
2. I still want Project Reactor “Flux” type to be used in the rest of the application, so there is another adapter that converts the Rx-Java Observable back to a Flux “Flux.from(RxReactiveStreams.toPublisher(citiesObservable))” once the call wrapped in Hystrix returns.
If I were to try this client with the wiremock sample with the 5 second delay, it correctly handles the delay and returns after a second.
Functional approach
There is a lot of boiler-plate with the previous approach which is avoided with the new functional approach of usingHystrixCommands, a utility class which comes with Spring Cloud which provides a functional approach to making the remote call wrapped with Hystrix.
The entirety of the call using HystrixCommands looks like this:
01 02 03 04 05 06 07 08 09 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 | import com.netflix.hystrix.HystrixCommandProperties import org.bk.samples.model.City import org.slf4j.Logger import org.slf4j.LoggerFactory import org.springframework.cloud.netflix.hystrix.HystrixCommands import org.springframework.http.MediaType import org.springframework.web.reactive.function.client.WebClient import org.springframework.web.reactive.function.client.bodyToFlux import org.springframework.web.util.UriComponentsBuilder import reactor.core.publisher.Flux import rx.schedulers.Schedulers import java.net.URI class CitiesFunctionalHystrixClient( private val webClientBuilder: WebClient.Builder, private val citiesBaseUrl: String ) { fun getCities(): Flux<City> { return HystrixCommands .from(callCitiesService()) .commandName( "cities-service" ) .groupName( "cities-service" ) .commandProperties( HystrixCommandProperties.Setter() .withExecutionTimeoutInMilliseconds( 1000 ) ) .toObservable { obs -> obs.observe() .subscribeOn(Schedulers.io()) } .fallback { t: Throwable -> LOGGER.error(t.message, t) Flux.empty() } .toFlux() } fun callCitiesService(): Flux<City> { val buildUri: URI = UriComponentsBuilder .fromUriString(citiesBaseUrl) .path( "/cities" ) .build() .encode() .toUri() val webClient: WebClient = this .webClientBuilder.build() return webClient.get() .uri(buildUri) .accept(MediaType.APPLICATION_JSON) .exchange() .flatMapMany { clientResponse -> clientResponse.bodyToFlux<City>() } } companion object { private val LOGGER: Logger = LoggerFactory.getLogger(CitiesHystrixCommand:: class .java) } } |
A lot of boiler-plate is avoided with this approach –
1. an explicit command is not required anymore
2. the call and the fallback are coded in a fluent manner
3. Any overrides can be explicitly specified – in this specific instance the timeout of 1 second.
Conclusion
I like the conciseness which HystrixCommands brings to the usage of Hystrix with WebClient. I have the entire sample available inmy github repo – https://github.com/bijukunjummen/webclient-hystrix-sample, all the dependencies required to get the samples to work is part of this repo. If you are interested in sticking with Rx-Java 1, then an approach described
here may help you avoid boiler-plate with vanilla Hystrix
Published on Java Code Geeks with permission by Biju Kunjummen, partner at our JCG program. See the original article here: Functional Hystrix using Spring Cloud HystrixCommands Opinions expressed by Java Code Geeks contributors are their own. |