Coherence Event Processing by using Map Trigger Feature
This article shows how to process Coherence events by using Map Triggers. Basically, Distributed Data Management in Oracle Coherence is suggested to look over basic configuration and implementation of Oracle Coherence API
Map Triggers are one of the most important features of Oracle Coherence to provide a highly customized cache management system. MapTrigger represents a functional agent that allows to validate, reject or modify mutating operations against an underlying map. Also, they can prevent invalid transactions, enforce security, provide event logging and auditing, and gather statistics on data modifications.
For example, we have code that is working with a NamedCache, and we want to change an entry’s behavior or contents before the entry is inserted into the map. This change can be made without modifying all the existing code by enabling a map trigger.
There are two ways to add Map Triggers feature to application :
1) A MapTriggerListener can be used to register a MapTrigger with a Named Cache
2) The class-factory mechanism can be used in the coherence-cache-config.xml configuration file
In the following sample application, MapTrigger functionality is implemented by following the first way. A new cluster called OTV, is created and User bean is distributed by user-map NamedCache object used among two members of the cluster.
Used Technologies :
JDK 1.6.0_35
Spring 3.1.2
Coherence 3.7.1
Maven 3.0.2
STEP 1 : CREATE MAVEN PROJECT
A maven project is created as below. (It can be created by using Maven or IDE Plug-in).
Coherence is downloaded via Coherence Package
STEP 3 : LIBRARIES
Firstly, Spring dependencies are added to Maven’ s pom.xml.
<!-- Spring 3.1.2 dependencies --> <dependency> <groupId>org.springframework</groupId> <artifactId>spring-core</artifactId> <version>${spring.version}</version> </dependency> <dependency> <groupId>org.springframework</groupId> <artifactId>spring-context</artifactId> <version>${spring.version}</version> </dependency>
Coherence library is installed to Local Maven Repository manually and its description is added to pom.xml as below. Also if Maven is not used to manage the project, coherence.jar file can be added to classpath.
<!-- Coherence library(from local repository) --> <dependency> <groupId>com.tangosol</groupId> <artifactId>coherence</artifactId> <version>3.7.1</version> </dependency>
For creating runnable-jar, the following Maven plugin can be used.
<plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-shade-plugin</artifactId> <version>1.3.1</version> <executions> <execution> <phase>package</phase> <goals> <goal>shade</goal> </goals> <configuration> <transformers> <transformer implementation='org.apache.maven.plugins.shade.resource.ManifestResourceTransformer'> <mainClass>com.otv.exe.Application</mainClass> </transformer> <transformer implementation='org.apache.maven.plugins.shade.resource.AppendingTransformer'> <resource>META-INF/spring.handlers</resource> </transformer> <transformer implementation='org.apache.maven.plugins.shade.resource.AppendingTransformer'> <resource>META-INF/spring.schemas</resource> </transformer> </transformers> </configuration> </execution> </executions> </plugin>
STEP 4 : CREATE otv-coherence-cache-config.xml
First Coherence configuration file is otv-coherence-cache-config.xml. It contains caching-schemes(distributed or replicated) and caching-scheme-mapping configuration. Created cache configuration should be added to coherence-cache-config.xml.
<?xml version='1.0'?> <cache-config xmlns:xsi='http://www.w3.org/2001/XMLSchema-instance' xmlns='http://xmlns.oracle.com/coherence/coherence-cache-config' xsi:schemaLocation='http://xmlns.oracle.com/coherence/coherence-cache-config coherence-cache-config.xsd'> <caching-scheme-mapping> <cache-mapping> <cache-name>user-map</cache-name> <scheme-name>MapDistCache</scheme-name> </cache-mapping> </caching-scheme-mapping> <caching-schemes> <distributed-scheme> <scheme-name>MapDistCache</scheme-name> <service-name>MapDistCache</service-name> <backing-map-scheme> <local-scheme> <unit-calculator>BINARY</unit-calculator> </local-scheme> </backing-map-scheme> <autostart>true</autostart> </distributed-scheme> </caching-schemes> </cache-config>
STEP 5 : CREATE tangosol-coherence-override.xml
Second Coherence configuration file is tangosol-coherence-override.xml. It contains cluster, member-identity and configurable-cache-factory configuration.
tangosol-coherence-override.xml for first member of the cluster :
<?xml version='1.0'?> <coherence xmlns:xsi='http://www.w3.org/2001/XMLSchema-instance' xmlns='http://xmlns.oracle.com/coherence/coherence-operational-config' xsi:schemaLocation='http://xmlns.oracle.com/coherence/coherence-operational-config coherence-operational-config.xsd'> <cluster-config> <member-identity> <cluster-name>OTV</cluster-name> <role-name>OTV1</role-name> </member-identity> <unicast-listener> <well-known-addresses> <socket-address id='1'> <address>x.x.x.x</address> <port>8089</port> </socket-address> <socket-address id='2'> <address>x.x.x.x</address> <port>8090</port> </socket-address> </well-known-addresses> <machine-id>1001</machine-id> <address>x.x.x.x</address> <port>8089</port> <port-auto-adjust>true</port-auto-adjust> </unicast-listener> </cluster-config> <configurable-cache-factory-config> <init-params> <init-param> <param-type>java.lang.String</param-type> <param-value system-property='tangosol.coherence.cacheconfig'> otv-coherence-cache-config.xml</param-value> </init-param> </init-params> </configurable-cache-factory-config> </coherence>
tangosol-coherence-override.xml for second member of the cluster :
<?xml version='1.0'?> <coherence xmlns:xsi='http://www.w3.org/2001/XMLSchema-instance' xmlns='http://xmlns.oracle.com/coherence/coherence-operational-config' xsi:schemaLocation='http://xmlns.oracle.com/coherence/coherence-operational-config coherence-operational-config.xsd'> <cluster-config> <member-identity> <cluster-name>OTV</cluster-name> <role-name>OTV2</role-name> </member-identity> <unicast-listener> <well-known-addresses> <socket-address id='1'> <address>x.x.x.x</address> <port>8090</port> </socket-address> <socket-address id='2'> <address>x.x.x.x</address> <port>8089</port> </socket-address> </well-known-addresses> <machine-id>1002</machine-id> <address>x.x.x.x</address> <port>8090</port> <port-auto-adjust>true</port-auto-adjust> </unicast-listener> </cluster-config> <configurable-cache-factory-config> <init-params> <init-param> <param-type>java.lang.String</param-type> <param-value system-property='tangosol.coherence.cacheconfig'> otv-coherence-cache-config.xml</param-value> </init-param> </init-params> </configurable-cache-factory-config> </coherence>
STEP 6 : CREATE applicationContext.xml
Spring Configuration file, applicationContext.xml, is created.
<beans xmlns='http://www.springframework.org/schema/beans' xmlns:xsi='http://www.w3.org/2001/XMLSchema-instance' xsi:schemaLocation='http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.0.xsd'> <!-- Beans Declaration --> <bean id='userCacheService' class='com.otv.srv.UserCacheService'></bean> <bean id='userCacheUpdater' class='com.otv.exe.UserCacheUpdater'> <property name='userCacheService' ref='userCacheService' /> </bean> </beans>
STEP 7 : CREATE User CLASS
A new User Spring bean is created. This bean will be distributed between two node in OTV cluster. For serializing, java.io.Serializable interface has been implemented but PortableObject can be implemented for better performance.
package com.otv.user; import java.io.Serializable; /** * User Bean * * @author onlinetechvision.com * @since 29 Oct 2012 * @version 1.0.0 * */ public class User implements Serializable { private static final long serialVersionUID = -1963764656789800896L; private String id; private String name; private String surname; public String getId() { return id; } public void setId(String id) { this.id = id; } public String getName() { return name; } public void setName(String name) { this.name = name; } public String getSurname() { return surname; } public void setSurname(String surname) { this.surname = surname; } @Override public String toString() { StringBuilder strBuff = new StringBuilder(); strBuff.append('id : ').append(id); strBuff.append(', name : ').append(name); strBuff.append(', surname : ').append(surname); return strBuff.toString(); } }
STEP 8 : CREATE IUserCacheService INTERFACE
A new IUserCacheService Interface is created for service layer to expose cache functionality.
package com.otv.srv; import com.tangosol.net.NamedCache; /** * IUserCacheService Interface exposes User Cache operations * * @author onlinetechvision.com * @since 29 Oct 2012 * @version 1.0.0 * */ public interface IUserCacheService { /** * Adds user entries to cache * * @param Object key * @param Object value * */ void addToUserCache(Object key, Object value); /** * Deletes user entries from cache * * @param Object key * */ void deleteFromUserCache(Object key); /** * Gets user cache * * @retun NamedCache Coherence named cache */ NamedCache getUserCache(); }
STEP 9 : CREATE UserCacheService IMPL CLASS
UserCacheService is created by implementing IUserCacheService.
package com.otv.srv; import com.otv.listener.UserMapListener; import com.otv.trigger.UserMapTrigger; import com.tangosol.net.CacheFactory; import com.tangosol.net.NamedCache; import com.tangosol.util.MapTriggerListener; /** * CacheService Class implements the ICacheService * * @author onlinetechvision.com * @since 29 Oct 2012 * @version 1.0.0 * */ public class UserCacheService implements IUserCacheService { private NamedCache userCache = null; private static final String USER_MAP = 'user-map'; private static final long LOCK_TIMEOUT = -1; public UserCacheService() { setUserCache(CacheFactory.getCache(USER_MAP)); getUserCache().addMapListener(new UserMapListener()); getUserCache().addMapListener(new MapTriggerListener(new UserMapTrigger())); } /** * Adds user entries to cache * * @param Object key * @param Object value * */ public void addToUserCache(Object key, Object value) { // key is locked getUserCache().lock(key, LOCK_TIMEOUT); try { // application logic getUserCache().put(key, value); } finally { // key is unlocked getUserCache().unlock(key); } } /** * Deletes user entries from cache * * @param Object key * */ public void deleteFromUserCache(Object key) { // key is locked getUserCache().lock(key, LOCK_TIMEOUT); try { // application logic getUserCache().remove(key); } finally { // key is unlocked getUserCache().unlock(key); } } /** * Gets user cache * * @retun NamedCache Coherence named cache */ public NamedCache getUserCache() { return userCache; } public void setUserCache(NamedCache userCache) { this.userCache = userCache; } }
STEP 10 : CREATE UserMapTrigger CLASS
A new UserMapTrigger class is created by implementing com.tangosol.util.MapTrigger Interface. This trigger processes the logic before the entry is inserted into the user-map.
package com.otv.trigger; import org.apache.log4j.Logger; import com.otv.listener.UserMapListener; import com.otv.user.User; import com.tangosol.util.MapTrigger; /** * UserMapTrigger executes required logic before the operation is committed * * @author onlinetechvision.com * @since 29 Oct 2012 * @version 1.0.0 * */ public class UserMapTrigger implements MapTrigger { private static final long serialVersionUID = 5411263646665358790L; private static Logger logger = Logger.getLogger(UserMapListener.class); /** * Processes user cache entries * * @param MapTrigger.Entry entry * */ public void process(MapTrigger.Entry entry) { User user = (User) entry.getValue(); String id = user.getId(); String name = user.getName(); String updatedName = name.toUpperCase(); String surname = user.getSurname(); String updatedSurname = surname.toUpperCase(); if (!updatedName.equals(name)) { user.setName(updatedName); } if (!updatedSurname.equals(surname)) { user.setSurname(updatedSurname); } user.setId(user.getName() + '_' + user.getSurname()); entry.setValue(user); logger.debug('UserMapTrigger processes the entry before committing. ' + 'oldId : ' + id + ', newId : ' + ((User)entry.getValue()).getId() + ', oldName : ' + name + ', newName : ' + ((User)entry.getValue()).getName() + ', oldSurname : ' + surname + ', newSurname : ' + ((User)entry.getValue()).getSurname() ); } public boolean equals(Object o) { return o != null && o.getClass() == this.getClass(); } public int hashCode() { return getClass().getName().hashCode(); } }
STEP 11 : CREATE USERMAPLISTENER IMPL CLASS
A new UserMapListener class is created. This listener receives distributed user-map events.
package com.otv.listener; import org.apache.log4j.Logger; import com.tangosol.util.MapEvent; import com.tangosol.util.MapListener; /** * UserMapListener Class listens user cache events * * @author onlinetechvision.com * @since 29 Oct 2012 * @version 1.0.0 * */ public class UserMapListener implements MapListener { private static Logger logger = Logger.getLogger(UserMapListener.class); public void entryDeleted(MapEvent me) { logger.debug('Deleted Key = ' + me.getKey() + ', Value = ' + me.getOldValue()); } public void entryInserted(MapEvent me) { logger.debug('Inserted Key = ' + me.getKey() + ', Value = ' + me.getNewValue()); } public void entryUpdated(MapEvent me) { // logger.debug('Updated Key = ' + me.getKey() + ', New_Value = ' + me.getNewValue() + ', Old Value = ' + me.getOldValue()); } }
STEP 12 : CREATE CacheUpdater CLASS
CacheUpdater Class is created to add new entry to cache and monitor cache content.
package com.otv.exe; import java.util.Collection; import org.apache.log4j.Logger; import com.otv.srv.IUserCacheService; import com.otv.user.User; /** * CacheUpdater Class updates and prints user cache entries * * @author onlinetechvision.com * @since 29 Oct 2012 * @version 1.0.0 * */ public class UserCacheUpdater implements Runnable { private static Logger logger = Logger.getLogger(UserCacheUpdater.class); private IUserCacheService userCacheService; /** * Runs the UserCacheUpdater Thread * */ public void run() { //New User are created... User user = new User(); //Only Name and Surname properties are set and Id property will be set at trigger level. user.setName('James'); user.setSurname('Joyce'); //Entries are added to cache... getUserCacheService().addToUserCache('user1', user); // The following code block shows the entry which will be inserted via second member of the cluster // so it should be opened and above code block should be commented-out before the project is built. // user.setName('Thomas'); // user.setSurname('Moore'); // getUserCacheService().addToUserCache('user2', user); //Cache Entries are being printed... printCacheEntries(); } /** * Prints User Cache Entries * */ @SuppressWarnings('unchecked') private void printCacheEntries() { Collection<User> userCollection = null; try { while(true) { userCollection = (Collection<User>)getUserCacheService().getUserCache().values(); for(User user : userCollection) { logger.debug('Cache Content : '+user); } Thread.sleep(60000); } } catch (InterruptedException e) { logger.error('CacheUpdater is interrupted!', e); } } public IUserCacheService getUserCacheService() { return userCacheService; } public void setUserCacheService(IUserCacheService userCacheService) { this.userCacheService = userCacheService; } }
STEP 13 : CREATE Application CLASS
Application Class is created to run the application.
package com.otv.exe; import org.springframework.context.ApplicationContext; import org.springframework.context.support.ClassPathXmlApplicationContext; /** * Application class starts the application * * @author onlinetechvision.com * @since 29 Oct 2012 * @version 1.0.0 * */ public class Application { /** * Starts the application * * @param String[] args * */ public static void main(String[] args) { ApplicationContext context = new ClassPathXmlApplicationContext('applicationContext.xml'); UserCacheUpdater cacheUpdater = (UserCacheUpdater) context.getBean('userCacheUpdater'); new Thread(cacheUpdater).start(); } }
nbsp;
STEP 14 : BUILD PROJECT
After OTV_Spring_Coherence_MapTrigger Project is build, OTV_Spring_Coherence_MapTrigger-0.0.1-SNAPSHOT.jar will be created.
Important Note : The Members of the cluster have got different configuration for Coherence so the project should be built separately for each member.
STEP 15 : RUN PROJECT BY STARTING ON MEMBER OF THE CLUSTER
After created OTV_Spring_Coherence-0.0.1-SNAPSHOT.jar file is run at the members of the cluster, below output logs will be shown on first member’ s console:
--A new cluster is created and First Member joins the cluster and adds a new entry to the cache. 29.10.2012 18:26:44 DEBUG (UserMapTrigger.java:49) - UserMapTrigger processes the entry before committing. oldId : null, newId : JAMES_JOYCE , oldName : James, newName : JAMES, oldSurname : Joyce, newSurname : JOYCE 29.10.2012 18:26:44 DEBUG (UserMapListener.java:25) - Inserted Key = user1, Value = id : JAMES_JOYCE, name : JAMES, surname : JOYCE 29.10.2012 18:26:44 DEBUG (UserCacheUpdater.java:63) - Cache Content : id : JAMES_JOYCE, name : JAMES, surname : JOYCE ....... --Second Member joins the cluster and adds a new entry to the cache. 29.10.2012 18:27:33 DEBUG (UserMapTrigger.java:49) - UserMapTrigger processes the entry before committing. oldId : null, newId : THOMAS_MOORE, oldName : Thomas, newName : THOMAS, oldSurname : Moore, newSurname : MOORE 29.10.2012 18:27:34 DEBUG (UserMapListener.java:25) - Inserted Key = user2, Value = id : THOMAS_MOORE, name : THOMAS, surname : MOORE ....... --After second member adds a new entry, cache content is shown as below : 29.10.2012 18:27:44 DEBUG (UserCacheUpdater.java:63) - Cache Content : id : THOMAS_MOORE, name : THOMAS, surname : MOORE 29.10.2012 18:27:45 DEBUG (UserCacheUpdater.java:63) - Cache Content : id : JAMES_JOYCE, name : JAMES, surname : JOYCE 29.10.2012 18:28:45 DEBUG (UserCacheUpdater.java:63) - Cache Content : id : THOMAS_MOORE, name : THOMAS, surname : MOORE 29.10.2012 18:28:45 DEBUG (UserCacheUpdater.java:63) - Cache Content : id : JAMES_JOYCE, name : JAMES, surname : JOYCE
Second member’ s console :
--After Second Member joins the cluster and adds a new entry to the cache, cache content is shown as below and the members has got same entries :. 29.10.2012 18:27:34 DEBUG (UserMapListener.java:25) - Inserted Key = user2, Value = id : THOMAS_MOORE, name : THOMAS, surname : MOORE 29.10.2012 18:27:34 DEBUG (UserCacheUpdater.java:63) - Cache Content : id : JAMES_JOYCE, name : JAMES, surname : JOYCE 29.10.2012 18:27:34 DEBUG (UserCacheUpdater.java:63) - Cache Content : id : THOMAS_MOORE, name : THOMAS, surname : MOORE 29.10.2012 18:28:34 DEBUG (UserCacheUpdater.java:63) - Cache Content : id : JAMES_JOYCE, name : JAMES, surname : JOYCE 29.10.2012 18:28:34 DEBUG (UserCacheUpdater.java:63) - Cache Content : id : THOMAS_MOORE, name : THOMAS, surname : MOORE
STEP 16 : DOWNLOAD
https://github.com/erenavsarogullari/OTV_Spring_Coherence_MapTrigger
Reference: Coherence Event Processing by using Map Trigger Feature from our JCG partner Eren Avsarogullari at the Online Technology Vision blog.