Gentle Introduction to Hystrix – Wrapup
This is a follow up to two other posts – Motivation for why something like Hystrix is needed in a distributed systems and a basic intro to Hystrix.
This will be a wrap of my Hystrix journey with details of various properties that can be tweaked to change the behavior of Hystrix and will touch on a few advanced concepts
Tweaking Hystrix Behavior
Hystrix configuration is explained in this wiki here, in brief two broad groups control the properties of Hystrix,
- Command Properties
- ThreadPool properties
The properties follow an order of precedence that is explained in the wiki, here I will concentrate on ones specified through a properties file.
For a sample Command defined the following way:
public class HelloWorldCommand extends HystrixCommand<String> { private static final Logger logger = LoggerFactory.getLogger(HelloWorldCommand.class); private final String name; public HelloWorldCommand(String name) { super(HystrixCommandGroupKey.Factory.asKey("default")); this.name = name; } @Override protected String run() throws Exception { logger.info("HelloWorld Command Invoked"); return "Hello " + name; } }
First behavior that can be tweaked is whether to execute the command in a thread pool or the same thread of execution as the caller(SEMAPHORE strategy type). If the execution is in a threadpool, then a timeout for the request can be set.
hystrix.command.HelloWorldCommand.execution.isolation.strategy=THREAD hystrix.command.HelloWorldCommand.execution.isolation.thread.timeoutInMilliseconds=1000
The second behavior is the Circuit breaker which works based on information collected during a rolling window of time, configured this way, say for 10 seconds:
hystrix.command.HelloWorldCommand.metrics.rollingStats.timeInMilliseconds=10000
In this window if a certain percent of failures(say 50%) happen for a threshold of requests(say 20 in 10 seconds) then the circuit is broken, with a configuration which looks like this:
hystrix.command.HelloWorldCommand.circuitBreaker.requestVolumeThreshold=20 hystrix.command.HelloWorldCommand.circuitBreaker.errorThresholdPercentage=50
Once a circuit is broken, it stays that way for a time set the following way, 5 seconds in this instance:
hystrix.command.HelloWorldCommand.circuitBreaker.sleepWindowInMilliseconds=5000
The threadpool settings are controlled using the Group Key that was specified, called default in this sample. A specific “Threadpool Key” could also have been specified as part of the constructor though.
hystrix.threadpool.default.coreSize=10 hystrix.threadpool.default.queueSizeRejectionThreshold=5
Here 10 commands can potentially be run in parallel and another 5 held in a queue beyond which the requests will be rejected.
Request Collapsing
Tomaz Nurkiewicz in his blog site NoBlogDefFound has done an excellent job of explaining Request Collapsing . My example is a little simplistic, consider a case where a lot of requests are being made to retrieve a Person given an id, the following way:
public class PersonService { public Person findPerson(Integer id) { return new Person(id, "name : " + id); } public List<Person> findPeople(List<Integer> ids) { return ids .stream() .map(i -> new Person(i, "name : " + i)) .collect(Collectors.toList()); } }
The service responds with a canned response but assume that the call was to a remote datastore. Also see that this service implements a batched method to retrieve a list of People given a list of id’s.
Request Collapsing is a feature which would batch multiple user requests occurring over a time period into a single such remote call and then fan out the response back to the user.
A hystrix command which takes the set of id’s and gets the response of people can be defined the following way:
public class PersonRequestCommand extends HystrixCommand<List<Person>>{ private final List<Integer> ids; private final PersonService personService = new PersonService(); private static final Logger logger = LoggerFactory.getLogger(PersonRequestCommand.class); public PersonRequestCommand(List<Integer> ids) { super(HystrixCommandGroupKey.Factory.asKey("default")); this.ids = ids; } @Override protected List<Person> run() throws Exception { logger.info("Retrieving details for : " + this.ids); return personService.findPeople(this.ids); } }
Fairly straightforward up to this point, the complicated logic is now in the RequestCollapser which looks like this:
package aggregate.commands.collapsed; import com.netflix.hystrix.HystrixCollapser; import com.netflix.hystrix.HystrixCollapserKey; import com.netflix.hystrix.HystrixCollapserProperties; import com.netflix.hystrix.HystrixCommand; import java.util.Collection; import java.util.List; import java.util.Map; import java.util.function.Function; import java.util.stream.Collectors; public class PersonRequestCollapser extends HystrixCollapser<List<Person>, Person, Integer> { private final Integer id; public PersonRequestCollapser(Integer id) { super(Setter. withCollapserKey(HystrixCollapserKey.Factory.asKey("personRequestCollapser")) .andCollapserPropertiesDefaults(HystrixCollapserProperties.Setter().withTimerDelayInMilliseconds(2000))); this.id = id; } @Override public Integer getRequestArgument() { return this.id; } @Override protected HystrixCommand<List<Person>> createCommand(Collection<CollapsedRequest<Person, Integer>> collapsedRequests) { List<Integer> ids = collapsedRequests.stream().map(cr -> cr.getArgument()).collect(Collectors.toList()); return new PersonRequestCommand(ids); } @Override protected void mapResponseToRequests(List<Person> batchResponse, Collection<CollapsedRequest<Person, Integer>> collapsedRequests) { Map<Integer, Person> personMap = batchResponse.stream().collect(Collectors.toMap(Person::getId, Function.identity())); for (CollapsedRequest<Person, Integer> cr: collapsedRequests) { cr.setResponse(personMap.get(cr.getArgument())); } } }
There are a few things going on here, first the types in the parameterized type signature indicates the type of response(List<Person>), the response type expected by the caller (Person) and the request type of the request(id of the person). Then there are two methods one to create a batch command and the second to map the responses back to the original requests.
Now given this from a users perspective nothing much changes, the call is made as if to a single command and Request Collapsing handles batching, dispatching and mapping back the responses. This is how a sample test looks like:
@Test public void testCollapse() throws Exception { HystrixRequestContext requestContext = HystrixRequestContext.initializeContext(); logger.info("About to execute Collapsed command"); List<Observable<Person>> result = new ArrayList<>(); CountDownLatch cl = new CountDownLatch(1); for (int i = 1; i <= 100; i++) { result.add(new PersonRequestCollapser(i).observe()); } Observable.merge(result).subscribe(p -> logger.info(p.toString()) , t -> logger.error(t.getMessage(), t) , () -> cl.countDown()); cl.await(); logger.info("Completed executing Collapsed Command"); requestContext.shutdown(); }
Conclusion
There is far more to Hystrix than what I have covered here. It is truly an awesome library, essential in creating a resilient system and I have come to appreciate the amount of thought process that has gone into designing this excellent library.
Reference: | Gentle Introduction to Hystrix – Wrapup from our JCG partner Biju Kunjummen at the all and sundry blog. |