Software Development

Complex Event Processing – a beginner’s view

Using a Complex Event Processing is not so complex. Well, initially at least. A substantial amount of information is available on the web on CEP products and functionality. But,if you are like me, you want to test run a product/application with little patience for reading detailed documentation. So when I was evaluating CEP as an engine for one of our future products, I decided to just try it out using a business scenario I knew from my past experienceworking with afinancial company. For the impatient developers like me, what could be better than using a free and open source product. So, I decided to use ‘Esper’, an open source product based on Java and was able to write the code (merely 3 java classes) to address business case below.

But first a little about CEP and a shameless plug of our product. My apologies. :-)
Complex Event Processing has been gaining significant ground recently. The benefits of CEP are widely understood in some verticals such as financial and insurance industries, where it is actively deployed to perform various business critical tasks. Monitoring, Fraud detection and algorithmic trading are some of those critical tasks that depend on CEP to integrate multiple streams of real-time data, identify patterns and generate actionable events for an organization.
My current employer, Sybase Inc is one of the leading suppliers of CEP. Aleri, the Sybase CEP product, is widely used in financial services industry and it is the main component of Sybase’s leading solution,’RAP – The Trading Edition’. Aleri is also sold as a separate product. Detailed information about the product is available here.
http://www.sybase.com/products/financialservicessolutions/complex-event-processing.

The high level architecture of a CEP application is shown in the diagram below.

 
Now on to the best part. The business requirement – The important aspect of CEP that fascinates me is its ability to co-relate events or data points from different streams or from within the same data stream. To elaborate, take an example of a retail bank that has a fraud monitoring system in place. The system flags every cash transaction over $10,000 for a manual review. What this means is a large cash transaction (a deposit or withdrawal) in an account raises the anti-money laundering event from the monitoring system. Such traditional monitoring systems can easily be circumvented /exploited by simple tricks such as depositing more than one check with smaller amounts. What happens if an account holder deposits 2 checks of $6000 in a day or 5 checks of $2500 in a day? Nothing. The system can’t catch it. The CEP provides a way to define rules with a time frame criterion. For example, you could specify a rule to raise a flag when some one deposits more than $10000 in cash in a 12 hour window. Get it?
Follow the steps below to see how easy it is to implement CEP to meet this business requirement.
Download latest Esper version (4.5.0 at the time of this writing) from here.
http://espertech.com/download/
Unzip the package in a separate folder.
Create a Java project and reference the Esper jar files from this folder.
Create a standard java bean for an event – which here is an Deposit account with a name and amount attributes.

package com.sybase.testTools.util;

import com.espertech.esper.client.EventBean;

public class DepositEvent {
 private String accountName;
 private int amount;

 public DepositEvent(String accountName, int amount) {
  this.accountName = accountName;
  this.amount = amount;
 }

 public String getAccountName() {
  return accountName;
 }

 public int getAmount() {
  return amount;
 }
}

The next listing is for creating an event type, the sql like query to create an event and to register a listener on that query. The code generates an event any time one of the two deposit accounts AccountA and AccountB is deposited with more than 100000 in a time frame of 10 seconds (this is where you specify the time window). Because this is just a test, I have put the event generation functionality together with other code, but in real life the deposit amounts wouldbe fedfrom deposit transaction processing system based on some messaging framework. The code is easy enough to follow. First we create the initial configuration. Then we add a type of event we want.A query with criterion for selecting the event is created next. As you can see the amount is summed up over sliding windows of 10 seconds and it creates an event when total of the amount in that time frame for a particular account exceeds 100000.A listener is created next and it is registered on the query.

package com.sybase.testTools;

import org.apache.log4j.BasicConfigurator;

import com.espertech.esper.client.Configuration;
import com.espertech.esper.client.EPServiceProvider;
import com.espertech.esper.client.EPServiceProviderManager;
import com.espertech.esper.client.EPStatement;
import com.sybase.testTools.util.MyListener;
import com.sybase.testTools.util.DepositEvent;

public class EsperTest {
 public static void main(String args[]) {
  try {
   Configuration config = new Configuration();

   config.addEventType("DepositEvent",
     com.sybase.testTools.util.DepositEvent.class
       .getName());
   EPServiceProvider epService = EPServiceProviderManager
     .getDefaultProvider(config);
   String expression = "select accountName, sum(amount) from com.sybase.testTools.util.DepositEvent.win:time(10 seconds)"
     + " group by accountName having sum(amount) > 100000";

   EPStatement statement = epService.getEPAdministrator().createEPL(
     expression);
   MyListener listener = new MyListener();
   statement.addListener(listener);
   int amount = 0;
   for (int i = 0; i < 1000; i++) {
    amount = i;
    DepositEvent event;
    if (i % 2 == 0) {
     event = new DepositEvent("AccountA", amount);
    } else {
     event = new DepositEvent("AccountB", amount);
    }
    epService.getEPRuntime().sendEvent(event);
   }

  } catch (Exception e) {
   e.printStackTrace();
  }
 }
}

The next listing is the listener. Every time an event is generated in the time window specified in the query, it gets added to the newEvents collection.

package com.sybase.testTools.util;

import com.espertech.esper.client.EventBean;
import com.espertech.esper.client.UpdateListener;

public class MyListener implements UpdateListener {
 public void update(EventBean[] newEvents, EventBean[] oldEvents) {
  try {
   if (newEvents == null) {

    return;
   }
   EventBean event = newEvents[0];
   System.out.println("Account: " + event.get("accountName")
     + ", exceeded the sum, actual " + event.get("sum(amount)"));
  } catch (Exception e) {
   e.printStackTrace();
  }

 }

}

Easy enough, right? The expression language itself is fairly easy to understand because of its similarities to standard SQL syntax. Although the real life implementation could become complex based on the type and number of feeds and events you want to monitor, the product in itself is simple enough to understand. Many of the commercial CEP products offer excellent user interface to create the type of events, queries and reports.

Complex event processing is still a growing field and the pace of its adoptionwill only increase as companies try to make sense of all the streams of data flowing in. The amount of semi-structured and other type of data (audio, video) has already surpassed the amount of traditional relational data. It’s easy to gauge the impact of good CEP application at a time when stock trading companies are already gleaning clues from twit feeds from twitter.

Reference: Complex Event Processing – a beginner’s view from our JCG partner Mahesh Gadgil at the Simple yet Practical blog.

Subscribe
Notify of
guest

This site uses Akismet to reduce spam. Learn how your comment data is processed.

1 Comment
Oldest
Newest Most Voted
Inline Feedbacks
View all comments
curat
curat
5 years ago

I’m a beginner to esper. I am not getting how to observe the outputs and execute various queries. Can you guide.

Back to top button