Aleri – Complex Event Processing
In my previous blog post on Complex Event Processing, I demonstrated the use of Esper, the open source CEP software and Twitter4J API to handle stream of tweets from Twitter. A CEP product is much more thanhandling just one stream of data though. Single stream of data could be easily handled through the standard asynchronous messaging platforms and does not pose very challenging scalability or latency issues. But when it comes to consuming more than one real time stream of data and to analyzing it in real time, and when correlation between the streams of data is important, nothing beats a CEP platform. The sources feeding streaming platform could vary in speed, volume and complexity. A true enterprise class CEP should deal effectively with various real time high speed data like stock tickers and slower but voluminous offline batch uploads, with equal ease. Apart from providing standard interfaces, CEP should also provide an easier programming language to query the streaming data and to generate continuous intelligence through such features as pattern matching and snapshot querying.
Sybase Trading Platform – the RAP edition. Trackback URL |
To keep it simple and at high level, CEP can be broken down to three basic parts. The first is the mechanism to grab/consume source data. Next is the process of investigating that data, identifying events & patterns and then interacting with target systems by providing them the actionable items. The actionable events take different forms and formats depending on the application you are using the CEP for. An action item could be – selling an equity position based on calculated risk in a risk monitoring application. indicating potential fraud events in money laundering applications or alerting to a catastrophic event in a monitoring system by reading thousands of sensors in a chemical plant. There literally are thousands of scenarios where a manual and off-line inspection of data is simply not an option. After you go through the following section, you may want to try Aleri yourself. This link http://www.sybase.com/aleriform directly takes you to the Aleri download page. Evaluation copy valid for 90 days is freely available from Sybase’s official website. Good amount of documentation, an excellent tutorial and some sample code on the website should help you getstarted quickly.
If you are an existing user of any CEP product, I encourage you to compare Aleri with that product and share it with the community or comment on this blog. By somewhat dated estimates, Tibco CEP is the biggest CEP vendor in the market. I am not sure how much market share another leading product StreamBasehas. There is also a webinar you can listen to on Youtube.comthat explains CEP benefits in general and some key features of Streambase in specific. For new comers, this serves as an excellent introduction to CEP and a capital markets use case.
An application on Aleri CEP is built by creating a model using the Studio (the gui) or using Splash(the language) or by using the Aleri Modeling language (ML) – the final stage before it is deployed.
Following is a list of the key features of Splash.
- Data Types – Supports standard data types and XML . Also supports ‘Typedef ‘ for user defined data types.
- Access Control – a granular level access control enabling access to a stream or modules (containing many streams)
- SQL – another way of building a model. Building an Aleri studio model could take longer due to its visual paradigm. Someone proficient with SQL should be able to do it much faster using Aleri SQL which is very similar to regular SQL we all know.
- Joins – supported joins are Inner, Left, Right and Full
- Filter expressions – include Where, having, Group having
- ML – Aleri SQL produces data model in Aleri modeling language (ML) – A proficient ML users might use only ML (in place of Aleri Studio and Aleri SQL)to build a model.
- The pattern matching language – includes constructs such as ‘within’ to indicate interval (sliding window), ‘from’ to indicate the stream of data and the interesting ‘fby’ that indicates a sequence (followed by)
- User defined functions – user defined function interface provided in the splash allows you to create functions in C++ or Java and to use them within a splash expression in the model.
Advanced pattern matching – capabilities are explained through example here. – Following three code segments and their explanations are directly taken from Sybase’s documentation on Aleri.
The first example checks to see whether a broker sends a buy order on the same stock as one of his orher customers, then inserts a buy order for the customer, and then sells that stock. It creates a “buyahead” event when those actions have occurred in that sequence.
within 5 minutes from BuyStock[Symbol=sym; Shares=n1; Broker=b; Customer=c0] as Buy1, BuyStock[Symbol=sym; Shares=n2; Broker=b; Customer=c1] as Buy2, SellStock[Symbol=sym; Shares=n1; Broker=b; Customer=c0] as Sell on Buy1 fby Buy2 fby Sell { if ((b = c0) and (b != c1)) { output [Symbol=sym; Shares=n1; Broker=b]; } }
This example checks for three events, one following the other, using the fby relationship. Because thesame variable sym is used in three patterns, the values in the three events must be the same. Differentvariables might have the same value, though (e.g., n1 and n2.) It outputs an event if the Broker andCustomer from the Buy1 and Sell events are the same, and the Customer from the Buy2 event is different.
The next example shows Boolean operations on events. The rule describes a possible theft condition,when there has been a product reading on a shelf (possibly through RFID), followed by a non-occurrenceof a checkout on that product, followed by a reading of the product at a scanner near the door.
within 12 hours from ShelfReading[TagId=tag; ProductName=pname] as onShelf, CounterReading[TagId=tag] as checkout, ExitReading[TagId=tag; AreaId=area] as exit on onShelf fby not(checkout) fby exit output [TagId=t; ProductName=pname; AreaId=area];
The next example shows how to raise an alert if a user tries to log in to an account unsuccessfully three times within 5 minutes.
from LoginAttempt[IpAddress=ip; Account=acct; Result=0] as login1, LoginAttempt[IpAddress=ip; Account=acct; Result=0] as login2, LoginAttempt[IpAddress=ip; Account=acct; Result=0] as login3, LoginAttempt[IpAddress=ip; Account=acct; Result=1] as login4 on (login1 fby login2 fby login3) and not(login4) output [Account=acct];
People wishing to break into computer systems often scan a number of TCP/IP ports for an open one,and attempt to exploit vulnerabilities in the programs listening on those ports. Here’s a rule that checkswhether a single IP address has attempted connections on three ports, and whether those have been followedby the use of the “sendmail” program.
within 30 minutes from Connect[Source=ip; Port=22] as c1, Connect[Source=ip; Port=23] as c2, Connect[Source=ip; Port=25] as c3 SendMail[Source=ip] as send on (c1 and c2 and c3) fby send output [Source=ip];
Aleri provides many interfaces out of the box for an easy integration with source and target systems. Through these interfaces/adapters the Aleri platform can communicate with standard relational databases, messaging frameworks like IBM MQ, sockets and file system files. Data in various formats like csv, FIX, Reuters market data, SOAP, http, SMTP is easily consumed by Aleri through standardized interfaces.
Following are available techniques for integrating Aleri with other systems.
- Pub/sub API is provided in Java, C++ and dot net – A standard pub/sub mechanism
- SQL interface with SELECT, UPDATE, DELETE and INSERT statements used through ODBC and JDBC connection.
- Built in adapters for market data and FIX
In the next part of this series we will look at the Aleri Studio, the gui that helps us build the CEP application the easy way.
Aleri, the complex event processing platform from Sybase was reviewed at high level in my last post.
This week, let’s review the Aleri Studio, the user interface to Aleri platform and the use of pub/sub api, one of many ways to interface with the Aleri platform. The studio is an integral part of the platform and comes packaged with the free evaluation copy. If you haven’t already done so, please download a copy from here. The fairly easy installation process of Aleri product gets you up and running in a few minutes.
The aleri studio is an authoring platform for building the model that defines interactions and sequencing between various data streams. It also can merge multiple streams to form one or more streams. With this eclipse based studio, you can test the models you build by feeding them with the test data and monitor the activity inside the streams in real time. Let’s look at the various type of streams you can define in Aleri and their functionality.
Source Stream – Only this type of stream can handle incoming data. The operations that can be performed by the incoming data are insert, update, delete and upsert. Upsert, as the name suggests updates data if the key defining a row is already present in the stream. Else, it inserts a record in the stream.
Aggregate Stream – This stream creates a summary record for each group defined by specific attribute. This provides functionality equivalent to ‘group by’ in ANSI SQL.
Copy stream – This stream is created by copying another stream but with a different retention rule.
Compute Stream – This stream allows you to use a function on each row of data to get a new computed element for each row of the data stream.
Extend Stream – This stream is derived from another stream by additional column expressions
Filter Stream – You can define a filter condition for this stream. Just like extend and compute streams, this stream applies filter conditions on other streams to derive a new stream.
Flex Stream – Significant flexibility in handling streaming data is achieved through custom coded methods. Only this stream allows you to write your own methods to meet special needs.
Join Stream – Creates a new stream by joining two or more streams on some condition. Both, Inner and Outer joins can be used to join streams.
Pattern Stream – Pattern matching rules are applied with this stream
Union Stream – As the name suggests, this joins two or more streams with same row data structure. Unlike the join stream, this stream includes all the data from all the participating streams.
By using some of these streams and the pub api of Aeri, I will demonstrate the seggregation of twitter live feed into two different streams. The twitter live feed is consumed by a listener from Twitter4j library. If you just want to try Twitter4j library first, please follow my earlier post ‘ Tracking user sentiments on Twitter‘. The data received by the twitter4j listener, is fed to a source stream in our model by using the publication API from Aleri. In this exercise we will try to separate out tweets based on their content. Built on the example from my previous post, we will divide the incoming stream into two streams based on the content. One stream will get any tweets that consists ‘lol’ and the other gets tweets with a smiley “:)” face in the text . First, let’s list the tasks we need to perform to make this a working example.
- Create a model with three streams
- Validate the model is error free
- Create a static data file
- Start the Aleri server and feed the static data file to the stream manually to confirm correct working of the model.
- Write java code to consume twitter feed. Use the publish API to publish the tweets to Aleri platform.
- Run the demo and see the live data as it flows through various streams.
This image is a snapshot of the Aleri Studio with the three streams – one on the left named “tweets” is a source stream and two on the right named “lolFilter” and “smileyFilter” are of the filter type. Source stream accepts incoming data while filter streams receive the filtered data. Here is how I defined the filter conditions – like (tweets.text, ‘%lol%’). tweets is the name of the stream and text is the field in the stream we are interested in. %lol% means, select any tweets that have ‘lol’ string in the content. Each stream has only 2 fields – id and text. The id and text maps to id and text-message sent by twitter. Once you define the model, you can check it for any errors by clicking on the check mark in the ribbon at the top. Erros if any will show up in the panel at bottom right of the image. Once your model is error free, it’s time to test it.
The following image shows the test interface of the studio. Try running your model with a static data file first. The small red square at the top indicates that Aleri server is currently running. The console window at the bottom right shows server messages like successful starts and stops etc. The Run-test tab in the left pane, is where you pick a static data file to feed the source stream. The pane on the right shows all the currently running streams and live data processed by the streams.
The image below shows the format of the data file used to test the model
tweets ALERI_OPS="i" id="1" text="324test 1234" ; tweets ALERI_OPS="i" id="2" text="test 12345"; tweets ALERI_OPS="i" id="3" text="test 1234666" ; tweets ALERI_OPS="i" id="4" text="test 1234888" ; tweets ALERI_OPS="i" id="5" text="test 1234999" ;
The source code for this exercise is at the bottom.
Remember that you need to have twitter4j library in the build path and have Aleri server running before you run the program. Because I have not added any timer to the execution thread, the only way to stop the execution is to abort it. For brevity and to keep the code line short, I have deleted all the exception handling and logging. The code utilizes only the publishing part of the pub/sub api of Aleri. I will demonstrate the use of sub side of the api in my next blog post.
package com.sybase.aleri; import java.io.BufferedWriter; import java.io.File; import java.io.FileWriter; import java.io.IOException; import twitter4j.Status; import twitter4j.StatusDeletionNotice; import twitter4j.StatusListener; import twitter4j.TwitterException; import twitter4j.TwitterStream; import twitter4j.TwitterStreamFactory; import twitter4j.conf.Configuration; import twitter4j.conf.ConfigurationBuilder; import com.aleri.pubsub.SpGatewayConstants; import com.aleri.pubsub.SpObserver; import com.aleri.pubsub.SpPlatform; import com.aleri.pubsub.SpPlatformParms; import com.aleri.pubsub.SpPlatformStatus; import com.aleri.pubsub.SpPublication; import com.aleri.pubsub.SpStream; import com.aleri.pubsub.SpStreamDataRecord; import com.aleri.pubsub.SpStreamDefinition; import com.aleri.pubsub.SpSubscription; import com.aleri.pubsub.SpSubscriptionCommon; import com.aleri.pubsub.impl.SpFactory; import com.aleri.pubsub.impl.SpUtils; import com.aleri.pubsub.test.ClientSpObserver; import java.text.SimpleDateFormat; import java.util.ArrayList; import java.util.Collection; import java.util.Date; import java.util.HashMap; import java.util.Vector; import java.util.TimeZone; public class TwitterTest_2 { //make sure that Aleri server is running prior to running this program static { //creates the publishing platform createPlatform(); } // Important objects from the publish API static SpStream stream; static SpPlatformStatus platformStatus; static SpPublication pub; public static void main(String[] args) throws TwitterException, IOException { TwitterTest_2 tt2 = new TwitterTest_2(); ConfigurationBuilder cb = new ConfigurationBuilder(); cb.setDebugEnabled(true); //use your twitter id and passcode cb.setUser("Your user name"); cb.setPassword("Your Password"); // creating the twitter4j listener Configuration cfg = cb.build(); TwitterStream twitterStream = new TwitterStreamFactory(cfg) .getInstance(); StatusListener_1 listener; listener = new StatusListener_1(); twitterStream.addListener(listener); //runs the sample that comes with twitter4j twitterStream.sample(); } private static int createPlatform() { int rc = 0; //Aleri platform configuration - better alternative is to your properties file String host = "localhost"; int port = 22000; //aleri configured to run with empty userid and pwd strings String user = ""; String password = ""; //name of the source stream - the one that gets the data from the twitter4j String streamName = "tweets"; String name = "TwitterTest_2"; SpPlatformParms parms = SpFactory.createPlatformParms(host, port, user, password, false, false); platformStatus = SpFactory.createPlatformStatus(); SpPlatform sp = SpFactory.createPlatform(parms, platformStatus); stream = sp.getStream(streamName); pub = sp.createPublication(name, platformStatus); // Then get the stream definition containing the schema information. SpStreamDefinition sdef = stream.getDefinition(); /* int numFieldsInRecord = sdef.getNumColumns(); Vector colTypes = sdef.getColumnTypes(); Vector colNames = sdef.getColumnNames(); */ return 0; } static SpStream getStream() { return stream; } static SpPlatformStatus getPlatformStatus() { return platformStatus; } static SpPublication getPublication() { return pub; } static int publish(SpStream stream, SpPlatformStatus platformStatus, SpPublication pub, Collection fieldData) { int rc = 0; int i = pub.start(); SpStreamDataRecord sdr = SpFactory.createStreamDataRecord(stream, fieldData, SpGatewayConstants.SO_UPSERT, SpGatewayConstants.SF_NULLFLAG, platformStatus); Collection dataSet = new Vector(); dataSet.add(sdr); System.out .println("\nAttempting to publish the data set to the Platform for stream <" + stream.getName() + ">."); rc = pub.publishTransaction(dataSet, SpGatewayConstants.SO_UPSERT, SpGatewayConstants.SF_NULLFLAG, 1); // commit blocks the thread until data is consumed by the platform System.out.println("before commit() call to the Platform."); rc = pub.commit(); return 0; } }
Reference: Aleri – Complex Event Processing – Part I, Understanding Aleri – Complex Event Processing – Part II from our JCG partner Mahesh Gadgil at the Simple yet Practical blog.