Beginner’s Guide to Hazelcast Part 4
This is the fourth installment of my Hazelcast series. If one has not seen the other 3, I suggest one go to Part 1, Part 2 and Part 3.
Logging
Logging is an important feature of any application and my examples are no different. System.out.println
can be a useful tool for telling the user what is going on in console applications. But let’s face it, if one is reading how to use a tool for distributed applications, that person is really not a beginner. Seeing a series of logging messages should not scare anyone. In fact, for the examples in this post, they are necessary to know what is going on by whom. We will be talking about threaded programming after all.
The good folks at Hazelcast seem to have agreed that logging is important and so have many different ways to define what library is logging. The logging framework only depends on JDK logging and has a number of adapters that even allow for custom logging frameworks. One’s logging adapter of choice is set by the property, hazelcast.logging.type to the following settings:
- JDK logging, This is the default.
- log4j
- slf4j
- none
I used Log4j2 so I picked slf4j and put in the four jar files needed to get it working.
Spinning Distributed Threads
Like many classes in Hazelcast, IExecutorService implements an interface from Java’s libraries, the ExecutorService. This interface defines what it is to be a thread pool. The interface is part of the java.util.concurrent package and has been around since Java 1.5. The package also has implementations of it one can access from java.util.concurrent.Executors. I wish I had something like this in Java 1.4 or 1.3 or 1.2 or 1.1. Making thread pools were fun until deadlocks happened. Now I can use Java library’s pools, good enough for me.
ExecutorServices have an interesting “feature.” One must shut them down or the service will not go away. The first time I used them, I caused a memory leak and shutdown the JVM. I caught the bug during my own testing so the customer never had to see my learning experience. IExecutorService has a different wrinkle. The service will not go away until all the threads finish. This caused many unclean shutdowns. You have been warned!
IExecutorServices can share out threads several different ways. Here they are in detail:
Any ‘Ole Instance
This is when one calls just the submit(Callable call).
This does more than just set a thread randomly into the cluster. It does some load balancing with that thread so an instance does not get clobbered with threads.
To a Particular Member
This is done via the submit(Callable call, Member member)
method. This sends a thread to a particular member of the cluster. No load balancing here; just sending to a member. Be careful, one can easily overload a member and really put the brakes on any processing being done. I could see this as a way to create a custom load balancer.
To a Collection of Members
Yeah, one can send a thread to multiple members. When i was doing my example coding, the members all act like they got their own thread and are not sharing one. If one implements Callable<T> as their thread implementation, the method returns a Map of Futures using the members as the key. If one uses Runnable, it returns nothing.
To The Member With the Right Key
Entries to a IMap can be anywhere on the cluster. If processing is needed to be done on that entry, a local thread would have to pull up the entry over the network. This can be a problem if the entry is very large. A better way would be to transfer the hopefully smaller thread over to the entry. To do that, the cluster needs to know where to send it. Hence, the call submit(Callable call, Object key)
.
To All Members
This works the same way as submitting to a collection of members but it is all of them, as in every member in the cluster. This could get “fun” if one has a large number of members in a cluster. I think I have heard as much as a 1000 members in one cluster. Make sure this is what one wants before it is called.
Using an ExecutionCallback
This is basically a way to send out some threads and let the results come back asynchronously. One uses an ExecutionCallback if one thread is submitted. One uses MultiExecutionCallback if more than one member is involved.
Example Code
Before I start, let me say that I do not have an example for every method in IExecutorService. I do have an example for every type discussed, however. Another thing about the example code. For instructional purposes I have done some copy-and-paste coding in prior posts so each example can stand on its own and one can get a context of what goes where. I did this quite a bit in part 3. If one did not notice it, look at it again.
This time I did not do it because there would have been a lot of code copied and the results would have been pretty ugly. I used an Enum and I think the results were very good. I thought an Enum was a good choice because of the limited number of examples and allowed me to be able to show the code in chunks that are understandable if the framework was shown first.
With that explanation, lets move on!
Framework
This are the main bits. It consists of the main class and the thread class. Notice how the main class shows each way a thread can submitted being called.
Main
package hazelcastservice; import com.hazelcast.core.Hazelcast; import com.hazelcast.core.HazelcastInstance; import com.hazelcast.core.IExecutorService; import java.util.ArrayList; import java.util.List; import java.util.concurrent.ExecutionException; import java.util.concurrent.RejectedExecutionException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** * * @author Daryl */ public class Main { private static final Logger logger = LoggerFactory.getLogger(Main.class); public static final String SERVICE_NAME = "spinnerella"; public static final int NUM_INSTANCES = 5; /** * @param args the command line arguments */ public static void main(String[] args) { System.setProperty("hazelcast.logging.type", "slf4j"); List<HazelcastInstance> instances = new ArrayList<>(NUM_INSTANCES); for(int i = 0; i < NUM_INSTANCES; i++) { instances.add(Hazelcast.newHazelcastInstance()); logger.info("instance {} up", i); } IExecutorService spinner = instances.get(0).getExecutorService(SERVICE_NAME); try { HazelcastIExecutorServiceExamples.TO_SOME_MEMBER.example(instances, spinner); HazelcastIExecutorServiceExamples.TO_PARTICULAR_MEMBER.example(instances, spinner); HazelcastIExecutorServiceExamples.ON_THE_KEY_OWNER.example(instances, spinner); HazelcastIExecutorServiceExamples.ON_A_SET_OF_MEMBERS.example(instances, spinner); HazelcastIExecutorServiceExamples.ON_ALL_MEMBERS.example(instances, spinner); HazelcastIExecutorServiceExamples.CALLBACK.example(instances, spinner); HazelcastIExecutorServiceExamples.MULTIPLE_MEMBERS_WITH_CALLBACK.example(instances, spinner); //Lets setup a loop to make sure they are all done (Especially the callback ones) for(HazelcastIExecutorServiceExamples example: HazelcastIExecutorServiceExamples.values()) { while(!example.isDone()) { Thread.sleep(1000); } } } catch(ExecutionException ee) { logger.warn("Can't finish the job", ee); } catch(InterruptedException ie) { logger.warn("Everybody out of the pool", ie); } finally { // time to clean up my toys boolean allClear = false; while(!allClear) { try { Thread.sleep(1000); Hazelcast.shutdownAll(); allClear = true; } catch(InterruptedException ie) { //got interrupted. try again } catch(RejectedExecutionException ree) { logger.debug("caught a RejectedExecutionException"); allClear = false; } } logger.info("All done"); } } }
Thread
package hazelcastservice; import java.io.Serializable; import java.util.Random; import java.util.concurrent.Callable; import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** * This class was inspired by the song "I Like to Move it" from the movie * Madagascar by Dreamworks. I offer NO apologies for using it. * * To those software developers who like consistent results, I used java.util.Random to * make it loop inconsistently each time call is called. * * Sometimes you need to make your own entertainment. * @author Daryl */ public class MoveItMoveIt implements Callable<Integer>, Serializable { private static final Logger logger = LoggerFactory.getLogger(MoveItMoveIt.class); private static final int UPPER_BOUND = 15; @Override public Integer call() throws Exception { Random random = new Random(); int howMany = random.nextInt(UPPER_BOUND); // int howMany = 2; for(int i = 0; i < howMany; i++) { logger.info("I like to Move it Move it!"); } logger.info("Move it!"); return howMany; } }
The Particulars
Here I go showing the different types of calls that were discussed. Remember that these are chunks of an Enum class. The done
is a protected variable and the public void example(List<HazelcastInstance> instances, IExecutorService spinner)
needed to implemented.
Any ‘Ole Instance
TO_SOME_MEMBER() { @Override public void example(List<HazelcastInstance> instances, IExecutorService spinner) throws ExecutionException, InterruptedException { logger.info("Submit to some member."); Future<Integer> howMany = spinner.submit(new MoveItMoveIt()); logger.info("It moved it {} times", howMany.get()); done = true; } }
To a Particular Member
TO_PARTICULAR_MEMBER { @Override public void example(List<HazelcastInstance> instances, IExecutorService spinner) throws ExecutionException, InterruptedException { logger.info("Submit to a particular member."); Member member = getRandomMember(instances); logger.debug("member is {}", member); Future<Integer> howMany = spinner.submitToMember(new MoveItMoveIt(), member); logger.info("It moved it {} times.", howMany.get()); done = true; } private Member getRandomMember(List<HazelcastInstance> instances) { Set<Member> members = instances.get(0).getCluster().getMembers(); int i = 0; int max = new Random().nextInt(instances.size()); Iterator<Member> iterator = members.iterator(); Member member = iterator.next(); while(iterator.hasNext() && (i < max)) { member = iterator.next(); i++; } return member; } }
To a Collection of Members
ON_A_SET_OF_MEMBERS { @Override public void example(List<HazelcastInstance> instances, IExecutorService spinner) throws ExecutionException, InterruptedException { logger.info("Send to some of the members"); Set<Member> randomMembers = getRandomMembers(instances); Map<Member, Future<Integer>> results = spinner.submitToMembers(new MoveItMoveIt(), randomMembers); for(Future<Integer> howMany: results.values()) { logger.info("It moved {} times", howMany.get()); } done = true; } private Set<Member> getRandomMembers(List<HazelcastInstance> instances) { int max = new Random().nextInt(instances.size()); Set<Member> newSet = new HashSet<>(instances.size()); int k = 0; Iterator<Member> i = instances.get(0).getCluster().getMembers().iterator(); while(i.hasNext() && k < max) { newSet.add(i.next()); k++; } return newSet; } }
To The Member With the Right Key
ON_THE_KEY_OWNER { @Override public void example(List<HazelcastInstance> instances, IExecutorService spinner) throws ExecutionException, InterruptedException { logger.info("Send to the one owning the key"); HazelcastInstance randomInstance = getRandomInstance(instances); IMap<Long, Boolean> map = randomInstance.getMap("default"); Long one = 1L; map.put(one, Boolean.TRUE); Future<Integer> howMany = spinner.submitToKeyOwner(new MoveItMoveIt(), one); logger.info("It moved it {} times.", howMany.get()); done = true; } private HazelcastInstance getRandomInstance(List<HazelcastInstance> instances) { return instances.get(new Random().nextInt(instances.size())); } }
To All Members
ON_ALL_MEMBERS { @Override public void example(List<HazelcastInstance> instances, IExecutorService spinner) throws ExecutionException, InterruptedException { logger.info("Send to all members"); Map<Member, Future<Integer>> results = spinner.submitToAllMembers(new MoveItMoveIt()); for(Future<Integer> howMany: results.values()) { logger.info("It moved {} times", howMany.get()); } done = true; } }
Using an ExecutionCallback
This example code contains two chunks of code to show a single callback and a multiple callback.
CALLBACK { @Override public void example(List<HazelcastInstance> instances, IExecutorService spinner) throws ExecutionException, InterruptedException { logger.info("example with a callback"); spinner.submit(new MoveItMoveIt(), new ExecutionCallback<Integer>() { @Override public void onResponse(Integer response) { logger.info("It moved {} times", response); done = true; } @Override public void onFailure(Throwable thrwbl) { logger.error("trouble in the callback", thrwbl); done = true; } }); } }, MULTIPLE_MEMBERS_WITH_CALLBACK { @Override public void example(List<HazelcastInstance> instances, IExecutorService spinner) throws ExecutionException, InterruptedException { logger.info("running on multiple members with callback"); spinner.submitToAllMembers(new MoveItMoveIt(), new MultiExecutionCallback() { @Override public void onResponse(Member member, Object o) { logger.info("member finished with {} moves", o); } @Override public void onComplete(Map<Member, Object> map) { logger.info("All members completed"); for(Object value: map.values()) { logger.info("It moved {} times", value); } done = true; } }); }
Conclusion
It was good to publish my own code/ideas again on my blog. I took a quick look at the power of the IExecutorService by Hazelcast. My example code followed the DRY principle. The code in its entirety can be found here.
References
As always with my Hazelcast guides, my information comes from Hazelcast documentation that can be found at here.
Reference: | Beginner’s Guide to Hazelcast Part 4 from our JCG partner Daryl Mathison at the Daryl Mathison’s Java Blog blog. |