Distribute Spring Beans in Oracle Coherence
Coherence supports a lock-free programming model through the EntryProcessor API. This feature improves system performance by reducing network access and performing an implicit low-level lock on the entries. This implicit low-level locking functionality is different than the explicit lock(key) provided by ConcurrentMap API.
Explicit locking, Transaction Framework API and Coherence Resource Adapter are other Coherence Transaction Options as Entry Processors. For detailed informations about Coherence Transaction Options, please look at the references section. In addition, Distributed Data Management in Oracle Coherence Article can be suggested for the Coherence Explicit locking implementation.
Portable Object Format(POF) is a platform-independent serialization format. It allows to encode equivalent Java, .NET and C++ objects into the identical sequence of bytes. POF is suggested for the system performance since Serialization and Deserialization performances of POF are better than the Standart Java Serialization(According to Coherence Reference document, in a simple test class with a String, a long, and three ints, (de)serialization was seven times faster than the Standart Java Serialization).
Coherence offers many kinds of cache types such as Distributed(or Partitioned), Replicated, Optimistic, Near, Local and Remote Cache. Distributed cache is defined as a collection of data that is distributed (or, partitioned) across any number of cluster nodes such that exactly one node in the cluster is responsible for each piece of data in the cache, and the responsibility is distributed (or, load-balanced) among the cluster nodes. Please note that distributed cache type has been used in this article. Since the other cache-types are not in the scope of this article, please look at the References section or Coherence Reference document. Their configurations are very similar to distributed cache configuration.
How to distribute Spring Beans by using Coherence Article covering Explicit locking – Java Standart Serialization is suggested to compare two different implementations(EntryProcessor – Portable Object Format(POF) and Explicit locking – Java Standart Serialization).
In this article, a new cluster named OTV has been created and a spring bean has been distributed by using a cache object named user-cache. It has been distributed between two members of the cluster.
Let us look at implementation of AbsctractProcessor implementing EntryProcessor Interface and PortableObject Interface for Spring Beans’ distribution between JVMs in a cluster.
Used Technologies :
JDK 1.6.0_31
Spring 3.1.1
Coherence 3.7.0
SolarisOS 5.10
Maven 3.0.2
A maven project is created as below. (It can be created by using Maven or IDE Plug-in).
Coherence is downloaded via Coherence Package
Firstly, Spring dependencies are added to Maven’ s pom.xml. Please note that Coherence library is installed to Local Maven Repository and its description is added to pom.xml as follows. Also if the maven is not used, coherence.jar file can be added to classpath.
01 02 03 04 05 06 07 08 09 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 | < properties > < spring.version >3.1.1.RELEASE</ spring.version > </ properties > < dependencies > <!-- Spring 3 dependencies --> < dependency > < groupId >org.springframework</ groupId > < artifactId >spring-core</ artifactId > < version >${spring.version}</ version > </ dependency > < dependency > < groupId >org.springframework</ groupId > < artifactId >spring-context</ artifactId > < version >${spring.version}</ version > </ dependency > <!-- Coherence library(from local repository) --> < dependency > < groupId >com.tangosol</ groupId > < artifactId >coherence</ artifactId > < version >3.7.0</ version > </ dependency > <!-- Log4j library --> < dependency > < groupId >log4j</ groupId > < artifactId >log4j</ artifactId > < version >1.2.16</ version > </ dependency > </ dependencies > |
The following maven-plugin can be used to create runnable-jar.
01 02 03 04 05 06 07 08 09 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 | < plugin > < groupId >org.apache.maven.plugins</ groupId > < artifactId >maven-shade-plugin</ artifactId > < version >1.3.1</ version > < executions > < execution > < phase >package</ phase > < goals > < goal >shade</ goal > </ goals > < configuration > < transformers > < transformer implementation = "org.apache.maven.plugins.shade.resource.ManifestResourceTransformer" > < mainClass >com.otv.exe.Application</ mainClass > </ transformer > < transformer implementation = "org.apache.maven.plugins.shade.resource.AppendingTransformer" > < resource >META-INF/spring.handlers</ resource > </ transformer > < transformer implementation = "org.apache.maven.plugins.shade.resource.AppendingTransformer" > < resource >META-INF/spring.schemas</ resource > </ transformer > </ transformers > </ configuration > </ execution > </ executions > </ plugin > |
STEP 4 : CREATE otv-pof-config.xml
otv-pof-config.xml covers the classes using Portable Object Format(POF) feature for serialization. In this example; User, UpdateUserProcessor and DeleteUserProcessor classes implement the com.tangosol.io.pof.PortableObject Interface.
-Dtangosol.pof.config argument can be used to define otv-pof-config.xml path in startup script.
01 02 03 04 05 06 07 08 09 10 11 12 13 14 15 16 17 18 19 20 21 22 23 | <? xml version = "1.0" ?> <! DOCTYPE pof-config SYSTEM "pof-config.dtd"> < pof-config > < user-type-list > <!-- coherence POF user types --> < include >coherence-pof-config.xml</ include > <!-- The definition of classes which use Portable Object Format --> < user-type > < type-id >1001</ type-id > < class-name >com.otv.user.User</ class-name > </ user-type > < user-type > < type-id >1002</ type-id > < class-name >com.otv.user.processor.UpdateUserProcessor</ class-name > </ user-type > < user-type > < type-id >1003</ type-id > < class-name >com.otv.user.processor.DeleteUserProcessor</ class-name > </ user-type > </ user-type-list > < allow-interfaces >true</ allow-interfaces > < allow-subclasses >true</ allow-subclasses > </ pof-config > |
STEP 5 : CREATE otv-coherence-cache-config.xml
otv-coherence-cache-config.xml contains caching-schemes(distributed or replicated) and caching-scheme-mapping configuration. Created cache configuration should be added to coherence-cache-config.xml.
01 02 03 04 05 06 07 08 09 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 | <? xml version = "1.0" ?> xsi:schemaLocation="http://xmlns.oracle.com/coherence/coherence-cache-config coherence-cache-config.xsd"> < caching-scheme-mapping > < cache-mapping > < cache-name >user-cache</ cache-name > < scheme-name >UserDistributedCacheWithPof</ scheme-name > </ cache-mapping > </ caching-scheme-mapping > < caching-schemes > < distributed-scheme > < scheme-name >UserDistributedCacheWithPof</ scheme-name > < service-name >UserDistributedCacheWithPof</ service-name > < serializer > < instance > < class-name >com.tangosol.io.pof.SafeConfigurablePofContext </ class-name > < init-params > < init-param > < param-type >String</ param-type > < param-value > <!-- pof-config.xml path should be set--> otv-pof-config.xml </ param-value > </ init-param > </ init-params > </ instance > </ serializer > < backing-map-scheme > < local-scheme /> </ backing-map-scheme > < autostart >true</ autostart > </ distributed-scheme > </ caching-schemes > </ cache-config > |
STEP 6 : CREATE tangosol-coherence-override.xml
tangosol-coherence-override.xml covers cluster, member-identity and configurable-cache-factory configuration. Also the following configuration xml file shows first member of the cluster. -Dtangosol.coherence.override argument can be used to define tangosol-coherence-override.xml path in startup script.
tangosol-coherence-override.xml for first member of the cluster :
01 02 03 04 05 06 07 08 09 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 | <? xml version = '1.0' ?> xsi:schemaLocation = "http://xmlns.oracle.com/coherence/coherence-operational-config coherence-operational-config.xsd" > < cluster-config > < member-identity > < cluster-name >OTV</ cluster-name > <!-- Name of the first member of the cluster --> < role-name >OTV1</ role-name > </ member-identity > < unicast-listener > < well-known-addresses > < socket-address id = "1" > <!-- IP Address of the first member of the cluster --> < address >x.x.x.x</ address > < port >8089</ port > </ socket-address > < socket-address id = "2" > <!-- IP Address of the second member of the cluster --> < address >y.y.y.y</ address > < port >8089</ port > </ socket-address > </ well-known-addresses > <!-- Name of the first member of the cluster --> < machine-id >OTV1</ machine-id > <!-- IP Address of the first member of the cluster --> < address >x.x.x.x</ address > < port >8089</ port > < port-auto-adjust >true</ port-auto-adjust > </ unicast-listener > </ cluster-config > < configurable-cache-factory-config > < init-params > < init-param > < param-type >java.lang.String</ param-type > < param-value system-property = "tangosol.coherence.cacheconfig" > <!-- coherence-cache-config.xml path should be set--> otv-coherence-cache-config.xml </ param-value > </ init-param > </ init-params > </ configurable-cache-factory-config > </ coherence > |
tangosol-coherence-override.xml for second member of the cluster :
01 02 03 04 05 06 07 08 09 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 | <? xml version = '1.0' ?> xsi:schemaLocation = "http://xmlns.oracle.com/coherence/coherence-operational-config coherence-operational-config.xsd" > < cluster-config > < member-identity > < cluster-name >OTV</ cluster-name > <!-- Name of the second member of the cluster --> < role-name >OTV2</ role-name > </ member-identity > < unicast-listener > < well-known-addresses > < socket-address id = "1" > <!-- IP Address of the first member of the cluster --> < address >x.x.x.x</ address > < port >8089</ port > </ socket-address > < socket-address id = "2" > <!-- IP Address of the second member of the cluster --> < address >y.y.y.y</ address > < port >8089</ port > </ socket-address > </ well-known-addresses > <!-- Name of the second member of the cluster --> < machine-id >OTV2</ machine-id > <!-- IP Address of the second member of the cluster --> < address >y.y.y.y</ address > < port >8089</ port > < port-auto-adjust >true</ port-auto-adjust > </ unicast-listener > </ cluster-config > < configurable-cache-factory-config > < init-params > < init-param > < param-type >java.lang.String</ param-type > < param-value system-property = "tangosol.coherence.cacheconfig" > <!-- coherence-cache-config.xml path should be set--> otv-coherence-cache-config.xml</ param-value > </ init-param > </ init-params > </ configurable-cache-factory-config > </ coherence > |
STEP 7 : CREATE applicationContext.xml
applicationContext.xml is created.
01 02 03 04 05 06 07 08 09 10 11 12 13 | xsi:schemaLocation="http://www.springframework.org/schema/beans <!-- Beans Declaration --> < bean id = "User" class = "com.otv.user.User" scope = "prototype" /> < bean id = "UserCacheService" class = "com.otv.user.cache.srv.UserCacheService" /> < bean id = "CacheUpdaterTask" class = "com.otv.cache.updater.task.CacheUpdaterTask" > < property name = "userCacheService" ref = "UserCacheService" /> </ bean > </ beans > |
STEP 8 : CREATE SystemConstants CLASS
SystemConstants Class is created. This class covers all system constants.
01 02 03 04 05 06 07 08 09 10 11 12 13 14 15 16 17 18 19 20 21 22 | package com.otv.common; /** * System Constants * * @author onlinetechvision.com * @since 2 Jun 2012 * @version 1.0.0 * */ public class SystemConstants { public static final String APPLICATION_CONTEXT_FILE_NAME = "applicationContext.xml" ; //Named Cache Definition... public static final String USER_CACHE = "user-cache" ; //Bean Names... public static final String BEAN_NAME_CACHE_UPDATER_TASK = "CacheUpdaterTask" ; public static final String BEAN_NAME_USER = "User" ; } |
A new User Spring bean is created. This bean will be distributed between two nodes in OTV cluster. PortableObject can be implemented for the serialization. PortableObject Interface has got two unimplemented methods as readExternal and writeExternal. The properties which are only serialized, must be defined. In this example, all the properties(id, name and surname of User) are serialized.
001 002 003 004 005 006 007 008 009 010 011 012 013 014 015 016 017 018 019 020 021 022 023 024 025 026 027 028 029 030 031 032 033 034 035 036 037 038 039 040 041 042 043 044 045 046 047 048 049 050 051 052 053 054 055 056 057 058 059 060 061 062 063 064 065 066 067 068 069 070 071 072 073 074 075 076 077 078 079 080 081 082 083 084 085 086 087 088 089 090 091 092 093 094 095 096 097 098 099 100 101 102 103 104 105 106 107 108 109 | package com.otv.user; import java.io.IOException; import com.tangosol.io.pof.PofReader; import com.tangosol.io.pof.PofWriter; import com.tangosol.io.pof.PortableObject; /** * User Bean * * @author onlinetechvision.com * @since 2 Jun 2012 * @version 1.0.0 * */ public class User implements PortableObject { private String id; private String name; private String surname; /** * Gets User Id * * @return String id */ public String getId() { return id; } /** * Sets User Id * * @param String id */ public void setId(String id) { this .id = id; } /** * Gets User Name * * @return String name */ public String getName() { return name; } /** * Sets User Name * * @param String name */ public void setName(String name) { this .name = name; } /** * Gets User Surname * * @return String surname */ public String getSurname() { return surname; } /** * Sets User Surname * * @param String surname */ public void setSurname(String surname) { this .surname = surname; } @Override public String toString() { StringBuilder strBuilder = new StringBuilder(); strBuilder.append( "Id : " ).append(id); strBuilder.append( ", Name : " ).append(name); strBuilder.append( ", Surname : " ).append(surname); return strBuilder.toString(); } /** * Restore the contents of a user type instance by reading its state * using the specified PofReader object. * * @param PofReader in */ public void readExternal(PofReader in) throws IOException { this .id = in.readString( 0 ); this .name = in.readString( 1 ); this .surname = in.readString( 2 ); } /** * Save the contents of a POF user type instance by writing its state * using the specified PofWriter object. * * @param PofWriter out */ public void writeExternal(PofWriter out) throws IOException { out.writeString( 0 , id); out.writeString( 1 , name); out.writeString( 2 , surname); } } |
A new IUserCacheService Interface is created to perform cache operations.
01 02 03 04 05 06 07 08 09 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 | package com.otv.user.cache.srv; import com.otv.user.User; import com.otv.user.processor.DeleteUserProcessor; import com.otv.user.processor.UpdateUserProcessor; import com.tangosol.net.NamedCache; /** * User Cache Service Interface * * @author onlinetechvision.com * @since 2 Jun 2012 * @version 1.0.0 * */ public interface IUserCacheService { /** * Gets Distributed User Cache * * @return NamedCache User Cache */ NamedCache getUserCache(); /** * Adds user to cache * * @param User user */ void addUser(User user); /** * Updates user on the cache * * @param String userId * @param UpdateUserProcessor processor * */ void updateUser(String userId, UpdateUserProcessor processor); /** * Deletes user from the cache * * @param String userId * @param DeleteUserProcessor processor * */ void deleteUser(String userId, DeleteUserProcessor processor); } |
STEP 11 : CREATE UserCacheService CLASS
UserCacheService Class is created by implementing IUserCacheService Interface.
01 02 03 04 05 06 07 08 09 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 | package com.otv.user.cache.srv; import com.otv.cache.listener.UserMapListener; import com.otv.common.SystemConstants; import com.otv.user.User; import com.otv.user.processor.DeleteUserProcessor; import com.otv.user.processor.UpdateUserProcessor; import com.tangosol.net.CacheFactory; import com.tangosol.net.NamedCache; /** * User Cache Service * * @author onlinetechvision.com * @since 2 Jun 2012 * @version 1.0.0 * */ public class UserCacheService implements IUserCacheService { private NamedCache userCache = null ; public UserCacheService() { setUserCache(CacheFactory.getCache(SystemConstants.USER_CACHE)); //UserMap Listener is registered to listen user-cache operations getUserCache().addMapListener( new UserMapListener()); } /** * Adds user to cache * * @param User user */ public void addUser(User user) { getUserCache().put(user.getId(), user); } /** * Deletes user from the cache * * @param String userId * @param DeleteUserProcessor processor * */ public void deleteUser(String userId, DeleteUserProcessor processor) { getUserCache().invoke(userId, processor); } /** * Updates user on the cache * * @param String userId * @param UpdateUserProcessor processor * */ public void updateUser(String userId, UpdateUserProcessor processor) { getUserCache().invoke(userId, processor); } /** * Gets Distributed User Cache * * @return NamedCache User Cache */ public NamedCache getUserCache() { return userCache; } /** * Sets User Cache * * @param NamedCache userCache */ public void setUserCache(NamedCache userCache) { this .userCache = userCache; } } |
A new UserMapListener class is created. This listener receives distributed user-cache events.
01 02 03 04 05 06 07 08 09 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 | package com.otv.cache.listener; import org.apache.log4j.Logger; import com.tangosol.util.MapEvent; import com.tangosol.util.MapListener; /** * User Map Listener * * @author onlinetechvision.com * @since 2 Jun 2012 * @version 1.0.0 * */ public class UserMapListener implements MapListener { private static Logger logger = Logger.getLogger(UserMapListener. class ); /** * This method is invoked when an entry is deleted from the cache... * * @param MapEvent me */ public void entryDeleted(MapEvent me) { logger.debug( "Deleted Key = " + me.getKey() + ", Value = " + me.getOldValue()); } /** * This method is invoked when an entry is inserted to the cache... * * @param MapEvent me */ public void entryInserted(MapEvent me) { logger.debug( "Inserted Key = " + me.getKey() + ", Value = " + me.getNewValue()); } /** * This method is invoked when an entry is updated on the cache... * * @param MapEvent me */ public void entryUpdated(MapEvent me) { logger.debug( "Updated Key = " + me.getKey() + ", New_Value = " + me.getNewValue() + ", Old Value = " + me.getOldValue()); } } |
STEP 13 : CREATE UpdateUserProcessor CLASS
AbstractProcessor is an abstract class under package com.tangosol.util.processor. It implements EntryProcessor Interface.
UpdateUserProcessor Class is created to process User Update operation on the cache. When UpdateUserProcessor is invoked for a key, firstly the member containing the key is found in the cluster. After then, UpdateUserProcessor is invoked from the member which contains the related key and its value(User object) is updated. Therefore, network traffic is reduced.
01 02 03 04 05 06 07 08 09 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 | package com.otv.user.processor; import java.io.IOException; import org.apache.log4j.Logger; import com.otv.user.User; import com.tangosol.io.pof.PofReader; import com.tangosol.io.pof.PofWriter; import com.tangosol.io.pof.PortableObject; import com.tangosol.util.InvocableMap.Entry; import com.tangosol.util.processor.AbstractProcessor; /** * Update User Processor * * @author onlinetechvision.com * @since 2 Jun 2012 * @version 1.0.0 * */ public class UpdateUserProcessor extends AbstractProcessor implements PortableObject { private static Logger logger = Logger.getLogger(UpdateUserProcessor. class ); private User newUser; /** * This empty constructor is added for Portable Object Format(POF). * */ public UpdateUserProcessor() { } public UpdateUserProcessor(User newUser) { this .newUser = newUser; } /** * Processes a Map.Entry object. * * @param Entry entry * @return Object newUser */ public Object process(Entry entry) { Object newValue = null ; try { newValue = getNewUser(); entry.setValue(newValue); } catch (Exception e) { logger.error( "Error occured when entry was being processed!" , e); } return newValue; } /** * Gets new user * * @return User newUser */ public User getNewUser() { return newUser; } /** * Sets new user * * @param User newUser */ public void setNewUser(User newUser) { this .newUser = newUser; } /** * Restore the contents of a user type instance by reading its state * using the specified PofReader object. * * @param PofReader in */ public void readExternal(PofReader in) throws IOException { setNewUser((User) in.readObject( 0 )); } /** * Save the contents of a POF user type instance by writing its state * using the specified PofWriter object. * * @param PofWriter out */ public void writeExternal(PofWriter out) throws IOException { out.writeObject( 0 , getNewUser()); } } |
STEP 14 : CREATE DeleteUserProcessor CLASS
DeleteUserProcessor Class is created to process User Deletion operation on the cache. When DeleteUserProcessor is invoked for a key, firstly the member containing the key is found in the cluster. After then, DeleteUserProcessor is invoked from the member which contains the related key. Therefore, network traffic is reduced.
01 02 03 04 05 06 07 08 09 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 | package com.otv.user.processor; import java.io.IOException; import org.apache.log4j.Logger; import com.otv.user.User; import com.tangosol.io.pof.PofReader; import com.tangosol.io.pof.PofWriter; import com.tangosol.io.pof.PortableObject; import com.tangosol.util.InvocableMap.Entry; import com.tangosol.util.processor.AbstractProcessor; /** * Delete User Processor * * @author onlinetechvision.com * @since 2 Jun 2012 * @version 1.0.0 * */ public class DeleteUserProcessor extends AbstractProcessor implements PortableObject { private static Logger logger = Logger.getLogger(DeleteUserProcessor. class ); /** * Processes a Map.Entry object. * * @param Entry entry * @return Object user */ public Object process(Entry entry) { User user = null ; try { user = (User) entry.getValue(); entry.remove( true ); } catch (Exception e) { logger.error( "Error occured when entry was being processed!" , e); } return user; } /** * Restore the contents of a user type instance by reading its state * using the specified PofReader object. * * @param PofReader in */ public void readExternal(PofReader in) throws IOException { } /** * Save the contents of a POF user type instance by writing its state * using the specified PofWriter object. * * @param PofWriter out */ public void writeExternal(PofWriter out) throws IOException { } } |
STEP 15 : CREATE CacheUpdaterTask CLASS
CacheUpdaterTask Class is created to perform cache operations(add, update and delete) and monitor cache content.
001 002 003 004 005 006 007 008 009 010 011 012 013 014 015 016 017 018 019 020 021 022 023 024 025 026 027 028 029 030 031 032 033 034 035 036 037 038 039 040 041 042 043 044 045 046 047 048 049 050 051 052 053 054 055 056 057 058 059 060 061 062 063 064 065 066 067 068 069 070 071 072 073 074 075 076 077 078 079 080 081 082 083 084 085 086 087 088 089 090 091 092 093 094 095 096 097 098 099 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 | package com.otv.cache.updater.task; import java.util.Collection; import org.apache.log4j.Logger; import org.springframework.beans.BeansException; import org.springframework.beans.factory.BeanFactory; import org.springframework.beans.factory.BeanFactoryAware; import com.otv.common.SystemConstants; import com.otv.user.User; import com.otv.user.cache.srv.IUserCacheService; import com.otv.user.processor.DeleteUserProcessor; import com.otv.user.processor.UpdateUserProcessor; /** * Cache Updater Task * * @author onlinetechvision.com * @since 2 Jun 2012 * @version 1.0.0 * */ public class CacheUpdaterTask implements BeanFactoryAware, Runnable { private static Logger log = Logger.getLogger(CacheUpdaterTask. class ); private IUserCacheService userCacheService; private BeanFactory beanFactory; public void run() { try { while ( true ) { /** * Before the project is built for the first member, * this code block should be used instead of * method processRequestsOnSecondMemberOfCluster. */ processRequestsOnFirstMemberOfCluster(); /** * Before the project is built for the second member, * this code block should be used instead of * method processRequestsOnFirstMemberOfCluster. */ // processRequestsOnSecondMemberOfCluster(); } } catch (InterruptedException e) { e.printStackTrace(); } } /** * Processes the cache requests on the first member of cluster... * * @throws InterruptedException */ private void processRequestsOnFirstMemberOfCluster() throws InterruptedException { //Entry is added to cache... getUserCacheService().addUser(getUser( "1" , "Bruce" , "Willis" )); //Cache Entries are being printed... printCacheEntries(); Thread.sleep( 10000 ); User newUser = getUser( "1" , "Client" , "Eastwood" ); //Existent Entry is updated on the cache... getUserCacheService().updateUser(newUser.getId(), new UpdateUserProcessor(newUser)); //Cache Entries are being printed... printCacheEntries(); Thread.sleep( 10000 ); //Entry is deleted from cache... getUserCacheService().deleteUser(newUser.getId(), new DeleteUserProcessor()); //Cache Entries are being printed... printCacheEntries(); Thread.sleep( 10000 ); } /** * Processes the cache requests on the second member of cluster... * * @throws InterruptedException */ private void processRequestsOnSecondMemberOfCluster() throws InterruptedException { //Entry is added to cache... getUserCacheService().addUser(getUser( "2" , "Nathalie" , "Portman" )); Thread.sleep( 15000 ); User newUser = getUser( "2" , "Sharon" , "Stone" ); //Existent Entry is updated on the cache... getUserCacheService().updateUser(newUser.getId(), new UpdateUserProcessor(newUser)); User newUser2 = getUser( "1" , "Maria" , "Sharapova" ); //Existent Entry is updated on the cache... getUserCacheService().updateUser(newUser2.getId(), new UpdateUserProcessor(newUser2)); Thread.sleep( 15000 ); //Entry is deleted from cache... getUserCacheService().deleteUser(newUser.getId(), new DeleteUserProcessor()); Thread.sleep( 15000 ); } /** * Prints cache entries * */ private void printCacheEntries() { Collection<User> userCollection = (Collection<User>)getUserCacheService().getUserCache().values(); for (User user : userCollection) { log.debug( "Cache Content : " +user); } } /** * Gets new user instance * * @param String user id * @param String user name * @param String user surname * @return User user */ private User getUser(String id, String name, String surname) { User user = getNewUserInstance(); user.setId(id); user.setName(name); user.setSurname(surname); return user; } /** * Gets user cache service... * * @return IUserCacheService userCacheService */ public IUserCacheService getUserCacheService() { return userCacheService; } /** * Sets user cache service... * * @param IUserCacheService userCacheService */ public void setUserCacheService(IUserCacheService userCacheService) { this .userCacheService = userCacheService; } /** * Gets a new instance of User Bean * * @return User */ public User getNewUserInstance() { return (User) getBeanFactory().getBean(SystemConstants.BEAN_NAME_USER); } /** * Gets bean factory * * @return BeanFactory */ public BeanFactory getBeanFactory() { return beanFactory; } /** * Sets bean factory * * @param BeanFactory beanFactory * @throws BeansException */ public void setBeanFactory(BeanFactory beanFactory) throws BeansException { this .beanFactory = beanFactory; } } |
STEP 16 : CREATE Application CLASS
Application Class is created to run the application.
01 02 03 04 05 06 07 08 09 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 | package com.otv.exe; import org.springframework.context.ApplicationContext; import org.springframework.context.support.ClassPathXmlApplicationContext; import com.otv.cache.updater.task.CacheUpdaterTask; import com.otv.common.SystemConstants; /** * Application Class * * @author onlinetechvision.com * @since 2 Jun 2012 * @version 1.0.0 * */ public class Application { /** * Starts the application * * @param String[] args * */ public static void main(String[] args) { ApplicationContext context = new ClassPathXmlApplicationContext(SystemConstants.APPLICATION_CONTEXT_FILE_NAME); CacheUpdaterTask cacheUpdaterTask = (CacheUpdaterTask) context.getBean(SystemConstants.BEAN_NAME_CACHE_UPDATER_TASK); Thread cacheUpdater = new Thread(cacheUpdaterTask); cacheUpdater.start(); } } |
After OTV_Spring_Coherence_With_Processor_and_POF Project is build, OTV_Spring_Coherence-0.0.1-SNAPSHOT.jar will be created.
Please note that the members of the cluster have got different configuration for Coherence so project should be built separately for each member.
After created OTV_Spring_Coherence-0.0.1-SNAPSHOT.jar file is run at the members of the cluster, the following output logs will be shown on first member’ s console:
01 02 03 04 05 06 07 08 09 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 | --After A new cluster is created and First Member joins the cluster, a new entry is added to the cache. 02.06.2012 14:21:45 DEBUG (UserMapListener.java:33) - Inserted Key = 1, Value = Id : 1, Name : Bruce, Surname : Willis 02.06.2012 14:21:45 DEBUG (CacheUpdaterTask.java:116) - Cache Content : Id : 1, Name : Bruce, Surname : Willis ....... --After Second Member joins the cluster, a new entry is added to the cache. 02.06.2012 14:21:45 DEBUG (UserMapListener.java:33) - Inserted Key = 2, Value = Id : 2, Name : Nathalie, Surname : Portman ....... --Cache operations go on both first and second members of the cluster: 02.06.2012 14:21:55 DEBUG (UserMapListener.java:42) - Updated Key = 1, New_Value = Id : 1, Name : Client, Surname : Eastwood, Old Value = Id : 1, Name : Bruce, Surname : Willis 02.06.2012 14:21:55 DEBUG (CacheUpdaterTask.java:116) - Cache Content : Id : 2, Name : Nathalie, Surname : Portman 02.06.2012 14:21:55 DEBUG (CacheUpdaterTask.java:116) - Cache Content : Id : 1, Name : Client, Surname : Eastwood 02.06.2012 14:22:00 DEBUG (UserMapListener.java:42) - Updated Key = 2, New_Value = Id : 2, Name : Sharon, Surname : Stone, Old Value = Id : 2, Name : Nathalie, Surname : Portman 02.06.2012 14:22:00 DEBUG (UserMapListener.java:42) - Updated Key = 1, New_Value = Id : 1, Name : Maria, Surname : Sharapova, Old Value = Id : 1, Name : Client, Surname : Eastwood 02.06.2012 14:22:05 DEBUG (UserMapListener.java:24) - Deleted Key = 1, Value = Id : 1, Name : Maria, Surname : Sharapova 02.06.2012 14:22:05 DEBUG (CacheUpdaterTask.java:116) - Cache Content : Id : 2, Name : Sharon, Surname : Stone 02.06.2012 14:22:15 DEBUG (UserMapListener.java:24) - Deleted Key = 2, Value = Id : 2, Name : Sharon, Surname : Stone 02.06.2012 14:22:15 DEBUG (UserMapListener.java:33) - Inserted Key = 1, Value = Id : 1, Name : Bruce, Surname : Willis 02.06.2012 14:22:15 DEBUG (CacheUpdaterTask.java:116) - Cache Content : Id : 1, Name : Bruce, Surname : Willis |
Further Reading :
Performing Transactions in Coherence
Using Portable Object Format in Coherence
Spring Framework Reference 3.x
Reference: How to distribute Spring Beans by using EntryProcessor and PortableObject features in Oracle Coherence from our JCG partner Eren Avsarogullari at the Online Technology Vision blog.