Hazelcast Distributed Execution with Spring
The ExecutorService feature had come with Java 5 and is under java.util.concurrent package. It extends the Executor interface and provides a thread pool functionality to execute asynchronous short tasks. Java Executor Service Types is suggested to look over basic ExecutorService implementation.
Also ThreadPoolExecutor is a very useful implementation of ExecutorService interface. It extends AbstractExecutorService providing default implementations of ExecutorService execution methods. It provides improved performance when executing large numbers of asynchronous tasks and maintains basic statistics, such as the number of completed tasks.
How to develop and monitor Thread Pool Services by using Spring is also suggested to investigate how to develop and monitor Thread Pool Services.
So far, we have just talked Undistributed Executor Service implementation. Let us also investigate Distributed Executor Service.
Hazelcast Distributed Executor Service feature is a distributed implementation of java.util.concurrent.ExecutorService. It allows to execute business logic in cluster. There are four alternative ways to realize it :
- The logic can be executed on a specific cluster member which is chosen.
- The logic can be executed on the member owning the key which is chosen.
- The logic can be executed on the member Hazelcast will pick.
- The logic can be executed on all or subset of the cluster members.
This article shows how to develop Distributed Executor Service via Hazelcast and Spring.
Used Technologies :
- JDK 1.7.0_09
- Spring 3.1.3
- Hazelcast 2.4
- Maven 3.0.4
STEP 1 : CREATE MAVEN PROJECT
A maven project is created as below. (It can be created by using Maven or IDE Plug-in).
STEP 2 : LIBRARIES
Firstly, Spring dependencies are added to Maven’ s pom.xml
<properties> <spring.version>3.1.3.RELEASE</spring.version> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> </properties> <dependencies> <!-- Spring 3 dependencies --> <dependency> <groupId>org.springframework</groupId> <artifactId>spring-core</artifactId> <version>${spring.version}</version> </dependency> <dependency> <groupId>org.springframework</groupId> <artifactId>spring-context</artifactId> <version>${spring.version}</version> </dependency> <!-- Hazelcast library --> <dependency> <groupId>com.hazelcast</groupId> <artifactId>hazelcast-all</artifactId> <version>2.4</version> </dependency> <!-- Log4j library --> <dependency> <groupId>log4j</groupId> <artifactId>log4j</artifactId> <version>1.2.16</version> </dependency> </dependencies>
maven-compiler-plugin(Maven Plugin) is used to compile the project with JDK 1.7
<plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-compiler-plugin</artifactId> <version>3.0</version> <configuration> <source>1.7</source> <target>1.7</target> </configuration> </plugin>
maven-shade-plugin(Maven Plugin) can be used to create runnable-jar
<plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-shade-plugin</artifactId> <version>2.0</version> <executions> <execution> <phase>package</phase> <goals> <goal>shade</goal> </goals> <configuration> <transformers> <transformer implementation='org.apache.maven.plugins.shade.resource. ManifestResourceTransformer'> <mainClass>com.onlinetechvision.exe.Application</mainClass> </transformer> <transformer implementation='org.apache.maven.plugins.shade.resource. AppendingTransformer'> <resource>META-INF/spring.handlers</resource> </transformer> <transformer implementation='org.apache.maven.plugins.shade.resource. AppendingTransformer'> <resource>META-INF/spring.schemas</resource> </transformer> </transformers> </configuration> </execution> </executions> </plugin>
STEP 3 : CREATE Customer BEAN
A new Customer bean is created. This bean will be distributed between two node in OTV cluster. In the following sample, all defined properties(id, name and surname)’ types are String and standart java.io.Serializable interface has been implemented for serializing. If custom or third-party object types are used, com.hazelcast.nio.DataSerializable interface can be implemented for better serialization performance.
package com.onlinetechvision.customer; import java.io.Serializable; /** * Customer Bean. * * @author onlinetechvision.com * @since 27 Nov 2012 * @version 1.0.0 * */ public class Customer implements Serializable { private static final long serialVersionUID = 1856862670651243395L; private String id; private String name; private String surname; public String getId() { return id; } public void setId(String id) { this.id = id; } public String getName() { return name; } public void setName(String name) { this.name = name; } public String getSurname() { return surname; } public void setSurname(String surname) { this.surname = surname; } @Override public int hashCode() { final int prime = 31; int result = 1; result = prime * result + ((id == null) ? 0 : id.hashCode()); result = prime * result + ((name == null) ? 0 : name.hashCode()); result = prime * result + ((surname == null) ? 0 : surname.hashCode()); return result; } @Override public boolean equals(Object obj) { if (this == obj) return true; if (obj == null) return false; if (getClass() != obj.getClass()) return false; Customer other = (Customer) obj; if (id == null) { if (other.id != null) return false; } else if (!id.equals(other.id)) return false; if (name == null) { if (other.name != null) return false; } else if (!name.equals(other.name)) return false; if (surname == null) { if (other.surname != null) return false; } else if (!surname.equals(other.surname)) return false; return true; } @Override public String toString() { return 'Customer [id=' + id + ', name=' + name + ', surname=' + surname + ']'; } }
STEP 4 : CREATE ICacheService INTERFACE
A new ICacheService Interface is created for service layer to expose cache functionality.
package com.onlinetechvision.cache.srv; import com.hazelcast.core.IMap; import com.onlinetechvision.customer.Customer; /** * A new ICacheService Interface is created for service layer to expose cache functionality. * * @author onlinetechvision.com * @since 27 Nov 2012 * @version 1.0.0 * */ public interface ICacheService { /** * Adds Customer entries to cache * * @param String key * @param Customer customer * */ void addToCache(String key, Customer customer); /** * Deletes Customer entries from cache * * @param String key * */ void deleteFromCache(String key); /** * Gets Customer cache * * @return IMap Coherence named cache */ IMap<String, Customer> getCache(); }
STEP 5 : CREATE CacheService IMPLEMENTATION
CacheService is implementation of ICacheService Interface.
package com.onlinetechvision.cache.srv; import com.hazelcast.core.IMap; import com.onlinetechvision.customer.Customer; import com.onlinetechvision.test.listener.CustomerEntryListener; /** * CacheService Class is implementation of ICacheService Interface. * * @author onlinetechvision.com * @since 27 Nov 2012 * @version 1.0.0 * */ public class CacheService implements ICacheService { private IMap<String, Customer> customerMap; /** * Constructor of CacheService * * @param IMap customerMap * */ @SuppressWarnings('unchecked') public CacheService(IMap<String, Customer> customerMap) { setCustomerMap(customerMap); getCustomerMap().addEntryListener(new CustomerEntryListener(), true); } /** * Adds Customer entries to cache * * @param String key * @param Customer customer * */ @Override public void addToCache(String key, Customer customer) { getCustomerMap().put(key, customer); } /** * Deletes Customer entries from cache * * @param String key * */ @Override public void deleteFromCache(String key) { getCustomerMap().remove(key); } /** * Gets Customer cache * * @return IMap Coherence named cache */ @Override public IMap<String, Customer> getCache() { return getCustomerMap(); } public IMap<String, Customer> getCustomerMap() { return customerMap; } public void setCustomerMap(IMap<String, Customer> customerMap) { this.customerMap = customerMap; } }
STEP 6 : CREATE IDistributedExecutorService INTERFACE
A new IDistributedExecutorService Interface is created for service layer to expose distributed execution functionality.
package com.onlinetechvision.executor.srv; import java.util.Collection; import java.util.Set; import java.util.concurrent.Callable; import java.util.concurrent.ExecutionException; import com.hazelcast.core.Member; /** * A new IDistributedExecutorService Interface is created for service layer to expose distributed execution functionality. * * @author onlinetechvision.com * @since 27 Nov 2012 * @version 1.0.0 * */ public interface IDistributedExecutorService { /** * Executes the callable object on stated member * * @param Callable callable * @param Member member * @throws InterruptedException * @throws ExecutionException * */ String executeOnStatedMember(Callable<String> callable, Member member) throws InterruptedException, ExecutionException; /** * Executes the callable object on member owning the key * * @param Callable callable * @param Object key * @throws InterruptedException * @throws ExecutionException * */ String executeOnTheMemberOwningTheKey(Callable<String> callable, Object key) throws InterruptedException, ExecutionException; /** * Executes the callable object on any member * * @param Callable callable * @throws InterruptedException * @throws ExecutionException * */ String executeOnAnyMember(Callable<String> callable) throws InterruptedException, ExecutionException; /** * Executes the callable object on all members * * @param Callable callable * @param Set all members * @throws InterruptedException * @throws ExecutionException * */ Collection<String> executeOnMembers(Callable<String> callable, Set<Member> members) throws InterruptedException, ExecutionException; }
STEP 7 : CREATE DistributedExecutorService IMPLEMENTATION
DistributedExecutorService is implementation of IDistributedExecutorService Interface.
package com.onlinetechvision.executor.srv; import java.util.Collection; import java.util.Set; import java.util.concurrent.Callable; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Future; import java.util.concurrent.FutureTask; import org.apache.log4j.Logger; import com.hazelcast.core.DistributedTask; import com.hazelcast.core.Member; import com.hazelcast.core.MultiTask; /** * DistributedExecutorService Class is implementation of IDistributedExecutorService Interface. * * @author onlinetechvision.com * @since 27 Nov 2012 * @version 1.0.0 * */ public class DistributedExecutorService implements IDistributedExecutorService { private static final Logger logger = Logger.getLogger(DistributedExecutorService.class); private ExecutorService hazelcastDistributedExecutorService; /** * Executes the callable object on stated member * * @param Callable callable * @param Member member * @throws InterruptedException * @throws ExecutionException * */ @SuppressWarnings('unchecked') public String executeOnStatedMember(Callable<String> callable, Member member) throws InterruptedException, ExecutionException { logger.debug('Method executeOnStatedMember is called...'); ExecutorService executorService = getHazelcastDistributedExecutorService(); FutureTask<String> task = (FutureTask<String>) executorService.submit( new DistributedTask<String>(callable, member)); String result = task.get(); logger.debug('Result of method executeOnStatedMember is : ' + result); return result; } /** * Executes the callable object on member owning the key * * @param Callable callable * @param Object key * @throws InterruptedException * @throws ExecutionException * */ @SuppressWarnings('unchecked') public String executeOnTheMemberOwningTheKey(Callable<String> callable, Object key) throws InterruptedException, ExecutionException { logger.debug('Method executeOnTheMemberOwningTheKey is called...'); ExecutorService executorService = getHazelcastDistributedExecutorService(); FutureTask<String> task = (FutureTask<String>) executorService.submit(new DistributedTask<String>(callable, key)); String result = task.get(); logger.debug('Result of method executeOnTheMemberOwningTheKey is : ' + result); return result; } /** * Executes the callable object on any member * * @param Callable callable * @throws InterruptedException * @throws ExecutionException * */ public String executeOnAnyMember(Callable<String> callable) throws InterruptedException, ExecutionException { logger.debug('Method executeOnAnyMember is called...'); ExecutorService executorService = getHazelcastDistributedExecutorService(); Future<String> task = executorService.submit(callable); String result = task.get(); logger.debug('Result of method executeOnAnyMember is : ' + result); return result; } /** * Executes the callable object on all members * * @param Callable callable * @param Set all members * @throws InterruptedException * @throws ExecutionException * */ public Collection<String> executeOnMembers(Callable<String> callable, Set<Member> members) throws ExecutionException, InterruptedException { logger.debug('Method executeOnMembers is called...'); MultiTask<String> task = new MultiTask<String>(callable, members); ExecutorService executorService = getHazelcastDistributedExecutorService(); executorService.execute(task); Collection<String> results = task.get(); logger.debug('Result of method executeOnMembers is : ' + results.toString()); return results; } public ExecutorService getHazelcastDistributedExecutorService() { return hazelcastDistributedExecutorService; } public void setHazelcastDistributedExecutorService(ExecutorService hazelcastDistributedExecutorService) { this.hazelcastDistributedExecutorService = hazelcastDistributedExecutorService; } }
STEP 8 : CREATE TestCallable CLASS
TestCallable Class shows business logic to be executed.
TestCallable task for first member of the cluster :
package com.onlinetechvision.task; import java.io.Serializable; import java.util.concurrent.Callable; /** * TestCallable Class shows business logic to be executed. * * @author onlinetechvision.com * @since 27 Nov 2012 * @version 1.0.0 * */ public class TestCallable implements Callable<String>, Serializable{ private static final long serialVersionUID = -1839169907337151877L; /** * Computes a result, or throws an exception if unable to do so. * * @return String computed result * @throws Exception if unable to compute a result */ public String call() throws Exception { return 'First Member' s TestCallable Task is called...'; } }
TestCallable task for second member of the cluster :
package com.onlinetechvision.task; import java.io.Serializable; import java.util.concurrent.Callable; /** * TestCallable Class shows business logic to be executed. * * @author onlinetechvision.com * @since 27 Nov 2012 * @version 1.0.0 * */ public class TestCallable implements Callable<String>, Serializable{ private static final long serialVersionUID = -1839169907337151877L; /** * Computes a result, or throws an exception if unable to do so. * * @return String computed result * @throws Exception if unable to compute a result */ public String call() throws Exception { return 'Second Member' s TestCallable Task is called...'; } }
STEP 9 : CREATE AnotherAvailableMemberNotFoundException CLASS
AnotherAvailableMemberNotFoundException is thrown when another available member is not found. To avoid this exception, first node should be started before the second node.
package com.onlinetechvision.exception; /** * AnotherAvailableMemberNotFoundException is thrown when another available member is not found. * To avoid this exception, first node should be started before the second node. * * @author onlinetechvision.com * @since 27 Nov 2012 * @version 1.0.0 * */ public class AnotherAvailableMemberNotFoundException extends Exception { private static final long serialVersionUID = -3954360266393077645L; /** * Constructor of AnotherAvailableMemberNotFoundException * * @param String Exception message * */ public AnotherAvailableMemberNotFoundException(String message) { super(message); } }
STEP 10 : CREATE CustomerEntryListener CLASS
CustomerEntryListener Class listens entry changes on named cache object.
package com.onlinetechvision.test.listener; import com.hazelcast.core.EntryEvent; import com.hazelcast.core.EntryListener; /** * CustomerEntryListener Class listens entry changes on named cache object. * * @author onlinetechvision.com * @since 27 Nov 2012 * @version 1.0.0 * */ @SuppressWarnings('rawtypes') public class CustomerEntryListener implements EntryListener { /** * Invoked when an entry is added. * * @param EntryEvent * */ public void entryAdded(EntryEvent ee) { System.out.println('EntryAdded... Member : ' + ee.getMember() + ', Key : '+ee.getKey()+', OldValue : '+ee.getOldValue()+', NewValue : '+ee.getValue()); } /** * Invoked when an entry is removed. * * @param EntryEvent * */ public void entryRemoved(EntryEvent ee) { System.out.println('EntryRemoved... Member : ' + ee.getMember() + ', Key : '+ee.getKey()+', OldValue : '+ee.getOldValue()+', NewValue : '+ee.getValue()); } /** * Invoked when an entry is evicted. * * @param EntryEvent * */ public void entryEvicted(EntryEvent ee) { } /** * Invoked when an entry is updated. * * @param EntryEvent * */ public void entryUpdated(EntryEvent ee) { } }
STEP 11 : CREATE Starter CLASS
Starter Class loads Customers to cache and executes distributed tasks.
Starter Class of first member of the cluster :
package com.onlinetechvision.exe; import com.onlinetechvision.cache.srv.ICacheService; import com.onlinetechvision.customer.Customer; /** * Starter Class loads Customers to cache and executes distributed tasks. * * @author onlinetechvision.com * @since 27 Nov 2012 * @version 1.0.0 * */ public class Starter { private ICacheService cacheService; /** * Loads cache and executes the tasks * */ public void start() { loadCacheForFirstMember(); } /** * Loads Customers to cache * */ public void loadCacheForFirstMember() { Customer firstCustomer = new Customer(); firstCustomer.setId('1'); firstCustomer.setName('Jodie'); firstCustomer.setSurname('Foster'); Customer secondCustomer = new Customer(); secondCustomer.setId('2'); secondCustomer.setName('Kate'); secondCustomer.setSurname('Winslet'); getCacheService().addToCache(firstCustomer.getId(), firstCustomer); getCacheService().addToCache(secondCustomer.getId(), secondCustomer); } public ICacheService getCacheService() { return cacheService; } public void setCacheService(ICacheService cacheService) { this.cacheService = cacheService; } }
Starter Class of second member of the cluster :
package com.onlinetechvision.exe; import java.util.Set; import java.util.concurrent.ExecutionException; import com.hazelcast.core.Hazelcast; import com.hazelcast.core.HazelcastInstance; import com.hazelcast.core.Member; import com.onlinetechvision.cache.srv.ICacheService; import com.onlinetechvision.customer.Customer; import com.onlinetechvision.exception.AnotherAvailableMemberNotFoundException; import com.onlinetechvision.executor.srv.IDistributedExecutorService; import com.onlinetechvision.task.TestCallable; /** * Starter Class loads Customers to cache and executes distributed tasks. * * @author onlinetechvision.com * @since 27 Nov 2012 * @version 1.0.0 * */ public class Starter { private String hazelcastInstanceName; private Hazelcast hazelcast; private IDistributedExecutorService distributedExecutorService; private ICacheService cacheService; /** * Loads cache and executes the tasks * */ public void start() { loadCache(); executeTasks(); } /** * Loads Customers to cache * */ public void loadCache() { Customer firstCustomer = new Customer(); firstCustomer.setId('3'); firstCustomer.setName('Bruce'); firstCustomer.setSurname('Willis'); Customer secondCustomer = new Customer(); secondCustomer.setId('4'); secondCustomer.setName('Colin'); secondCustomer.setSurname('Farrell'); getCacheService().addToCache(firstCustomer.getId(), firstCustomer); getCacheService().addToCache(secondCustomer.getId(), secondCustomer); } /** * Executes Tasks * */ public void executeTasks() { try { getDistributedExecutorService().executeOnStatedMember(new TestCallable(), getAnotherMember()); getDistributedExecutorService().executeOnTheMemberOwningTheKey(new TestCallable(), '3'); getDistributedExecutorService().executeOnAnyMember(new TestCallable()); getDistributedExecutorService().executeOnMembers(new TestCallable(), getAllMembers()); } catch (InterruptedException | ExecutionException | AnotherAvailableMemberNotFoundException e) { e.printStackTrace(); } } /** * Gets cluster members * * @return Set<Member> Set of Cluster Members * */ private Set<Member> getAllMembers() { Set<Member> members = getHazelcastLocalInstance().getCluster().getMembers(); return members; } /** * Gets an another member of cluster * * @return Member Another Member of Cluster * @throws AnotherAvailableMemberNotFoundException An Another Available Member can not found exception */ private Member getAnotherMember() throws AnotherAvailableMemberNotFoundException { Set<Member> members = getAllMembers(); for(Member member : members) { if(!member.localMember()) { return member; } } throw new AnotherAvailableMemberNotFoundException('No Other Available Member on the cluster. Please be aware that all members are active on the cluster'); } /** * Gets Hazelcast local instance * * @return HazelcastInstance Hazelcast local instance */ @SuppressWarnings('static-access') private HazelcastInstance getHazelcastLocalInstance() { HazelcastInstance instance = getHazelcast().getHazelcastInstanceByName(getHazelcastInstanceName()); return instance; } public String getHazelcastInstanceName() { return hazelcastInstanceName; } public void setHazelcastInstanceName(String hazelcastInstanceName) { this.hazelcastInstanceName = hazelcastInstanceName; } public Hazelcast getHazelcast() { return hazelcast; } public void setHazelcast(Hazelcast hazelcast) { this.hazelcast = hazelcast; } public IDistributedExecutorService getDistributedExecutorService() { return distributedExecutorService; } public void setDistributedExecutorService(IDistributedExecutorService distributedExecutorService) { this.distributedExecutorService = distributedExecutorService; } public ICacheService getCacheService() { return cacheService; } public void setCacheService(ICacheService cacheService) { this.cacheService = cacheService; } }
STEP 12 : CREATE hazelcast-config.properties FILE
hazelcast-config.properties file shows the properties of cluster members.
First member properties :
hz.instance.name = OTVInstance1 hz.group.name = dev hz.group.password = dev hz.management.center.enabled = true hz.management.center.url = http://localhost:8080/mancenter hz.network.port = 5701 hz.network.port.auto.increment = false hz.tcp.ip.enabled = true hz.members = 192.168.1.32 hz.executor.service.core.pool.size = 2 hz.executor.service.max.pool.size = 30 hz.executor.service.keep.alive.seconds = 30 hz.map.backup.count=2 hz.map.max.size=0 hz.map.eviction.percentage=30 hz.map.read.backup.data=true hz.map.cache.value=true hz.map.eviction.policy=NONE hz.map.merge.policy=hz.ADD_NEW_ENTRY
Second member properties :
hz.instance.name = OTVInstance2 hz.group.name = dev hz.group.password = dev hz.management.center.enabled = true hz.management.center.url = http://localhost:8080/mancenter hz.network.port = 5702 hz.network.port.auto.increment = false hz.tcp.ip.enabled = true hz.members = 192.168.1.32 hz.executor.service.core.pool.size = 2 hz.executor.service.max.pool.size = 30 hz.executor.service.keep.alive.seconds = 30 hz.map.backup.count=2 hz.map.max.size=0 hz.map.eviction.percentage=30 hz.map.read.backup.data=true hz.map.cache.value=true hz.map.eviction.policy=NONE hz.map.merge.policy=hz.ADD_NEW_ENTRY
STEP 13 : CREATE applicationContext-hazelcast.xml
Spring Hazelcast Configuration file, applicationContext-hazelcast.xml, is created and Hazelcast Distributed Executor Service and Hazelcast Instance are configured.
<beans xmlns='http://www.springframework.org/schema/beans' xmlns:xsi='http://www.w3.org/2001/XMLSchema-instance' xmlns:hz='http://www.hazelcast.com/schema/spring' xsi:schemaLocation='http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.0.xsd http://www.hazelcast.com/schema/spring http://www.hazelcast.com/schema/spring/hazelcast-spring-2.4.xsd'> <hz:map id='customerMap' name='customerMap' instance-ref='instance'/> <!-- Hazelcast Distributed Executor Service definition --> <hz:executorService id='hazelcastDistributedExecutorService' instance-ref='instance' name='hazelcastDistributedExecutorService' /> <!-- Hazelcast Instance configuration --> <hz:hazelcast id='instance'> <hz:config> <!-- Hazelcast Instance Name --> <hz:instance-name>${hz.instance.name}</hz:instance-name> <!-- Hazelcast Group Name and Password --> <hz:group name='${hz.group.name}' password='${hz.group.password}'/> <!-- Hazelcast Management Center URL --> <hz:management-center enabled='${hz.management.center.enabled}' url='${hz.management.center.url}'/> <!-- Hazelcast Tcp based network configuration --> <hz:network port='${hz.network.port}' port-auto-increment='${hz.network.port.auto.increment}'> <hz:join> <hz:tcp-ip enabled='${hz.tcp.ip.enabled}'> <hz:members>${hz.members}</hz:members> </hz:tcp-ip> </hz:join> </hz:network> <!-- Hazelcast Distributed Executor Service configuration --> <hz:executor-service name='executorService' core-pool-size='${hz.executor.service.core.pool.size}' max-pool-size='${hz.executor.service.max.pool.size}' keep-alive-seconds='${hz.executor.service.keep.alive.seconds}'/> <!-- Hazelcast Distributed Map configuration --> <hz:map name='map' backup-count='${hz.map.backup.count}' max-size='${hz.map.max.size}' eviction-percentage='${hz.map.eviction.percentage}' read-backup-data='${hz.map.read.backup.data}' cache-value='${hz.map.cache.value}' eviction-policy='${hz.map.eviction.policy}' merge-policy='${hz.map.merge.policy}' /> </hz:config> </hz:hazelcast> </beans>
STEP 14 : CREATE applicationContext.xml
Spring Configuration file, applicationContext.xml, is created.
<beans xmlns='http://www.springframework.org/schema/beans' xmlns:xsi='http://www.w3.org/2001/XMLSchema-instance' xmlns:hz='http://www.hazelcast.com/schema/spring' xsi:schemaLocation='http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.0.xsd'> <import resource='classpath:applicationContext-hazelcast.xml' /> <!-- Beans Declaration --> <bean id='propertyConfigurer' class='org.springframework.beans.factory.config.PropertyPlaceholderConfigurer'> <property name='locations'> <list> <value>classpath:/hazelcast-config.properties</value> </list> </property> </bean> <bean id='cacheService' class='com.onlinetechvision.cache.srv.CacheService'> <constructor-arg ref='customerMap'/> </bean> <bean id='distributedExecutorService' class='com.onlinetechvision.executor.srv.DistributedExecutorService'> <property name='hazelcastDistributedExecutorService' ref='hazelcastDistributedExecutorService' /> </bean> <bean id='hazelcast' class='com.hazelcast.core.Hazelcast'/> <bean id='starter' class='com.onlinetechvision.exe.Starter'> <property name='hazelcastInstanceName' value='${hz.instance.name}' /> <property name='hazelcast' ref='hazelcast' /> <property name='distributedExecutorService' ref='distributedExecutorService' /> <property name='cacheService' ref='cacheService' /> </bean> </beans>
STEP 15 : CREATE Application CLASS
Application Class is created to run the application.
package com.onlinetechvision.exe; import org.springframework.context.ApplicationContext; import org.springframework.context.support.ClassPathXmlApplicationContext; /** * Application class starts the application * * @author onlinetechvision.com * @since 27 Nov 2012 * @version 1.0.0 * */ public class Application { /** * Starts the application * * @param String[] args * */ public static void main(String[] args) { ApplicationContext context = new ClassPathXmlApplicationContext('applicationContext.xml'); Starter starter = (Starter) context.getBean('starter'); starter.start(); } }
STEP 16 : BUILD PROJECT
After OTV_Spring_Hazelcast_DistributedExecution Project is built, OTV_Spring_Hazelcast_DistributedExecution-0.0.1-SNAPSHOT.jar will be created.
Important Note : The Members of the cluster have got different configuration for Coherence so the project should be built separately for each member.
STEP 17 : INTEGRATION with HAZELCAST MANAGEMENT CENTER
Hazelcast Management Center enables to monitor and manage nodes in the cluster.
Entity and backup counts which are owned by customerMap, can be seen via Map Memory Data Table. We have distributed 4 entries via customerMap as shown below :
Sample keys and values can be seen via Map Browser :
hazelcastDistributedExecutorService details can be seen via Executors tab. We have executed 3 task on first member and 2 tasks on second member as shown below :
STEP 18 : RUN PROJECT BY STARTING THE CLUSTER’ s MEMBER
After created OTV_Spring_Hazelcast_DistributedExecution-0.0.1-SNAPSHOT.jar file is run at the cluster’ s members, the following console output logs will be shown :
First member console output :
Kas 25, 2012 4:07:20 PM com.hazelcast.impl.AddressPicker INFO: Interfaces is disabled, trying to pick one address from TCP-IP config addresses: [x.y.z.t] Kas 25, 2012 4:07:20 PM com.hazelcast.impl.AddressPicker INFO: Prefer IPv4 stack is true. Kas 25, 2012 4:07:20 PM com.hazelcast.impl.AddressPicker INFO: Picked Address[x.y.z.t]:5701, using socket ServerSocket[addr=/0:0:0:0:0:0:0:0,localport=5701], bind any local is true Kas 25, 2012 4:07:21 PM com.hazelcast.system INFO: [x.y.z.t]:5701 [dev] Hazelcast Community Edition 2.4 (20121017) starting at Address[x.y.z.t]:5701 Kas 25, 2012 4:07:21 PM com.hazelcast.system INFO: [x.y.z.t]:5701 [dev] Copyright (C) 2008-2012 Hazelcast.com Kas 25, 2012 4:07:21 PM com.hazelcast.impl.LifecycleServiceImpl INFO: [x.y.z.t]:5701 [dev] Address[x.y.z.t]:5701 is STARTING Kas 25, 2012 4:07:24 PM com.hazelcast.impl.TcpIpJoiner INFO: [x.y.z.t]:5701 [dev] --A new cluster is created and First Member joins the cluster. Members [1] { Member [x.y.z.t]:5701 this } Kas 25, 2012 4:07:24 PM com.hazelcast.impl.MulticastJoiner INFO: [x.y.z.t]:5701 [dev] Members [1] { Member [x.y.z.t]:5701 this } ... -- First member adds two new entries to the cache... EntryAdded... Member : Member [x.y.z.t]:5701 this, Key : 1, OldValue : null, NewValue : Customer [id=1, name=Jodie, surname=Foster] EntryAdded... Member : Member [x.y.z.t]:5701 this, Key : 2, OldValue : null, NewValue : Customer [id=2, name=Kate, surname=Winslet] ... --Second Member joins the cluster. Members [2] { Member [x.y.z.t]:5701 this Member [x.y.z.t]:5702 } ... -- Second member adds two new entries to the cache... EntryAdded... Member : Member [x.y.z.t]:5702, Key : 4, OldValue : null, NewValue : Customer [id=4, name=Colin, surname=Farrell] EntryAdded... Member : Member [x.y.z.t]:5702, Key : 3, OldValue : null, NewValue : Customer [id=3, name=Bruce, surname=Willis]
Second member console output :
Kas 25, 2012 4:07:48 PM com.hazelcast.impl.AddressPicker INFO: Interfaces is disabled, trying to pick one address from TCP-IP config addresses: [x.y.z.t] Kas 25, 2012 4:07:48 PM com.hazelcast.impl.AddressPicker INFO: Prefer IPv4 stack is true. Kas 25, 2012 4:07:48 PM com.hazelcast.impl.AddressPicker INFO: Picked Address[x.y.z.t]:5702, using socket ServerSocket[addr=/0:0:0:0:0:0:0:0,localport=5702], bind any local is true Kas 25, 2012 4:07:49 PM com.hazelcast.system INFO: [x.y.z.t]:5702 [dev] Hazelcast Community Edition 2.4 (20121017) starting at Address[x.y.z.t]:5702 Kas 25, 2012 4:07:49 PM com.hazelcast.system INFO: [x.y.z.t]:5702 [dev] Copyright (C) 2008-2012 Hazelcast.com Kas 25, 2012 4:07:49 PM com.hazelcast.impl.LifecycleServiceImpl INFO: [x.y.z.t]:5702 [dev] Address[x.y.z.t]:5702 is STARTING Kas 25, 2012 4:07:49 PM com.hazelcast.impl.Node INFO: [x.y.z.t]:5702 [dev] ** setting master address to Address[x.y.z.t]:5701 Kas 25, 2012 4:07:49 PM com.hazelcast.impl.MulticastJoiner INFO: [x.y.z.t]:5702 [dev] Connecting to master node: Address[x.y.z.t]:5701 Kas 25, 2012 4:07:49 PM com.hazelcast.nio.ConnectionManager INFO: [x.y.z.t]:5702 [dev] 55715 accepted socket connection from /x.y.z.t:5701 Kas 25, 2012 4:07:55 PM com.hazelcast.cluster.ClusterManager INFO: [x.y.z.t]:5702 [dev] --Second Member joins the cluster. Members [2] { Member [x.y.z.t]:5701 Member [x.y.z.t]:5702 this } Kas 25, 2012 4:07:56 PM com.hazelcast.impl.LifecycleServiceImpl INFO: [x.y.z.t]:5702 [dev] Address[x.y.z.t]:5702 is STARTED -- Second member adds two new entries to the cache... EntryAdded... Member : Member [x.y.z.t]:5702 this, Key : 3, OldValue : null, NewValue : Customer [id=3, name=Bruce, surname=Willis] EntryAdded... Member : Member [x.y.z.t]:5702 this, Key : 4, OldValue : null, NewValue : Customer [id=4, name=Colin, surname=Farrell] 25.11.2012 16:07:56 DEBUG (DistributedExecutorService.java:42) - Method executeOnStatedMember is called... 25.11.2012 16:07:56 DEBUG (DistributedExecutorService.java:46) - Result of method executeOnStatedMember is : First Member' s TestCallable Task is called... 25.11.2012 16:07:56 DEBUG (DistributedExecutorService.java:61) - Method executeOnTheMemberOwningTheKey is called... 25.11.2012 16:07:56 DEBUG (DistributedExecutorService.java:65) - Result of method executeOnTheMemberOwningTheKey is : First Member' s TestCallable Task is called... 25.11.2012 16:07:56 DEBUG (DistributedExecutorService.java:78) - Method executeOnAnyMember is called... 25.11.2012 16:07:57 DEBUG (DistributedExecutorService.java:82) - Result of method executeOnAnyMember is : Second Member' s TestCallable Task is called... 25.11.2012 16:07:57 DEBUG (DistributedExecutorService.java:96) - Method executeOnMembers is called... 25.11.2012 16:07:57 DEBUG (DistributedExecutorService.java:101) - Result of method executeOnMembers is : [First Member' s TestCallable Task is called..., Second Member' s TestCallable Task is called...]
STEP 19 : DOWNLOAD
https://github.com/erenavsarogullari/OTV_Spring_Hazelcast_DistributedExecution
Related Links :
Java ExecutorService Interface
Hazelcast Distributed Executor Service
Reference: Hazelcast Distributed Execution with Spring from our JCG partner Eren Avsarogullari at the Online Technology Vision blog.