Flume: Gathering customer product search clicks data using Apache Flume
This post covers to use Apache flume to gather customer product search clicks and store the information using hadoop and elasticsearch sinks. The data may consist of different product search events like filtering based on different facets, sorting information, pagination information and further the products viewed and some of the products marked as favorite by the customers. In later posts we will analyze data further to use the same information for display and analytic.
Product Search Functionality
Any eCommerce platform offers different products to customers and search functionality is one of the basics of that. Allowing user for guided navigation using different facets/filters or free text search for the content is trivial of the any of existing search functionality.
SearchQueryInstruction
Consider a similar scenario where customer can search for a product and allows us to capture the product search behavior with following information,
public class SearchQueryInstruction implements Serializable { @JsonIgnore private final String _eventIdSuffix; private String eventId; private String hostedMachineName; private String pageUrl; private Long customerId; private String sessionId; private String queryString; private String sortOrder; private Long pageNumber; private Long totalHits; private Long hitsShown; private final Long createdTimeStampInMillis; private String clickedDocId; private Boolean favourite; @JsonIgnore private Map<String, Set<String>> filters; @JsonProperty(value = "filters") private List<FacetFilter> _filters; public SearchQueryInstruction() { _eventIdSuffix = UUID.randomUUID().toString(); createdTimeStampInMillis = new Date().getTime(); } ... ... private static class FacetFilter implements Serializable { private String code; private String value; public FacetFilter(String code, String value) { this.code = code; this.value = value; } ... ... } }
Further source information available at, SearchQueryInstruction. The data is serialized in JSON format to be able to directly use with ElasticSearch for further display purposes.
Sample data, how the clicks information look like based on user clicks. The data is converted to json format before sending it to the embedded flume agent.
{"eventid":"629e9b5f-ff4a-4168-8664-6c8df8214aa7-1399386809805-24","hostedmachinename":"192.168.182.1330","pageurl":"http://jaibigdata.com/5","customerid":24,"sessionid":"648a011d-570e-48ef-bccc-84129c9fa400","querystring":null,"sortorder":"desc","pagenumber":3,"totalhits":28,"hitsshown":7,"createdtimestampinmillis":1399386809805,"clickeddocid":"41","favourite":null,"eventidsuffix":"629e9b5f-ff4a-4168-8664-6c8df8214aa7","filters":[{"code":"searchfacettype_color_level_2","value":"Blue"},{"code":"searchfacettype_age_level_2","value":"12-18 years"}]} {"eventid":"648b5cf7-7ca9-4664-915d-23b0d45facc4-1399386809782-298","hostedmachinename":"192.168.182.1333","pageurl":"http://jaibigdata.com/4","customerid":298,"sessionid":"7bf042ea-526a-4633-84cd-55e0984ea2cb","querystring":"queryString48","sortorder":"desc","pagenumber":0,"totalhits":29,"hitsshown":19,"createdtimestampinmillis":1399386809782,"clickeddocid":"9","favourite":null,"eventidsuffix":"648b5cf7-7ca9-4664-915d-23b0d45facc4","filters":[{"code":"searchfacettype_color_level_2","value":"Green"}]} {"eventid":"74bb7cfe-5f8c-4996-9700-0c387249a134-1399386809799-440","hostedmachinename":"192.168.182.1330","pageurl":"http://jaibigdata.com/1","customerid":440,"sessionid":"940c9a0f-a9b2-4f1d-b114-511ac11bf2bb","querystring":"queryString16","sortorder":"asc","pagenumber":3,"totalhits":5,"hitsshown":32,"createdtimestampinmillis":1399386809799,"clickeddocid":null,"favourite":null,"eventidsuffix":"74bb7cfe-5f8c-4996-9700-0c387249a134","filters":[{"code":"searchfacettype_brand_level_2","value":"Apple"}]} {"eventid":"9da05913-84b1-4a74-89ed-5b6ec6389cce-1399386809828-143","hostedmachinename":"192.168.182.1332","pageurl":"http://jaibigdata.com/1","customerid":143,"sessionid":"08a4a36f-2535-4b0e-b86a-cf180202829b","querystring":null,"sortorder":"desc","pagenumber":0,"totalhits":21,"hitsshown":34,"createdtimestampinmillis":1399386809828,"clickeddocid":"38","favourite":true,"eventidsuffix":"9da05913-84b1-4a74-89ed-5b6ec6389cce","filters":[{"code":"searchfacettype_color_level_2","value":"Blue"},{"code":"product_price_range","value":"10.0 - 20.0"}]}
Apache Flume
Apache Flume is used to gather and aggregate data. Here Embedded Flume agent is used to capture Search Query instruction Events. In real scenario based on the usage,
- Either you can use embedded agent to collect data
- Or through rest api to push data from page to backend api service dedicated for events collections
- Or you can use application logging functionality to log all search events and tail the log file to collect data
Consider a scenario depending on application, multiple web/app servers sending events data to collector flume agent. As depicted in the diagram below the search clicks events are collected from multiple web/app servers and a collector/consolidator agent to collect data from all agents. The data is further divided based on selector using multiplexing strategy to store in Hadoop HDFS and also directing relevant data to ElasticSearch, eg. recently viewed items.
Embedded Flume Agent
Embedded Flume Agent allows us to include the flume agent within the application itself and allows us to collect data and send further to collector agent.
private static EmbeddedAgent agent; private void createAgent() { final Map<String, String> properties = new HashMap<String, String>(); properties.put("channel.type", "memory"); properties.put("channel.capacity", "100000"); properties.put("channel.transactionCapacity", "1000"); properties.put("sinks", "sink1"); properties.put("sink1.type", "avro"); properties.put("sink1.hostname", "localhost"); properties.put("sink1.port", "44444"); properties.put("processor.type", "default"); try { agent = new EmbeddedAgent("searchqueryagent"); agent.configure(properties); agent.start(); } catch (final Exception ex) { LOG.error("Error creating agent!", ex); } }
Store Search Events Data
Flume provides multiple sink option to store the data for future analysis. As shown in the diagram, we will take the scenario to store the data in Apache Hadoop and also on ElasticSearch for recently viewed items functionality.
Hadoop Sink
Allows to store the data permanently to HDFS to be able to analyze it later for analytics.
Based on the incoming events data, let’s say we want to store same based on hourly basis. “/searchevents/2014/05/15/16″ directory will store all incoming events for hour 16.
private HDFSEventSink sink; sink = new HDFSEventSink(); sink.setName("HDFSEventSink-" + UUID.randomUUID()); channel = new MemoryChannel(); Map<String, String> channelParamters = new HashMap<>(); channelParamters.put("capacity", "100000"); channelParamters.put("transactionCapacity", "1000"); Context channelContext = new Context(channelParamters); Configurables.configure(channel, channelContext); channel.setName("HDFSEventSinkChannel-" + UUID.randomUUID()); Map<String, String> paramters = new HashMap<>(); paramters.put("hdfs.type", "hdfs"); String hdfsBasePath = hadoopClusterService.getHDFSUri() + "/searchevents"; paramters.put("hdfs.path", hdfsBasePath + "/%Y/%m/%d/%H"); paramters.put("hdfs.filePrefix", "searchevents"); paramters.put("hdfs.fileType", "DataStream"); paramters.put("hdfs.rollInterval", "0"); paramters.put("hdfs.rollSize", "0"); paramters.put("hdfs.idleTimeout", "1"); paramters.put("hdfs.rollCount", "0"); paramters.put("hdfs.batchSize", "1000"); paramters.put("hdfs.useLocalTimeStamp", "true"); Context sinkContext = new Context(paramters); sink.configure(sinkContext); sink.setChannel(channel); sink.start(); channel.start();
Check FlumeHDFSSinkServiceImpl.java for detailed start/stop of the hdfs sink.
Sample data below, is stored in hadoop like,
Check:hdfs://localhost.localdomain:54321/searchevents/2014/05/06/16/searchevents.1399386809864 body is:{"eventid":"e8470a00-c869-4a90-89f2-f550522f8f52-1399386809212-72","hostedmachinename":"192.168.182.1334","pageurl":"http://jaibigdata.com/0","customerid":72,"sessionid":"7871a55c-a950-4394-bf5f-d2179a553575","querystring":null,"sortorder":"desc","pagenumber":0,"totalhits":8,"hitsshown":44,"createdtimestampinmillis":1399386809212,"clickeddocid":"23","favourite":null,"eventidsuffix":"e8470a00-c869-4a90-89f2-f550522f8f52","filters":[{"code":"searchfacettype_brand_level_2","value":"Apple"},{"code":"searchfacettype_color_level_2","value":"Blue"}]} body is:{"eventid":"2a4c1e1b-d2c9-4fe2-b38d-9b7d32feb4e0-1399386809743-61","hostedmachinename":"192.168.182.1330","pageurl":"http://jaibigdata.com/0","customerid":61,"sessionid":"78286f6d-cc1e-489c-85ce-a7de8419d628","querystring":"queryString59","sortorder":"asc","pagenumber":3,"totalhits":32,"hitsshown":9,"createdtimestampinmillis":1399386809743,"clickeddocid":null,"favourite":null,"eventidsuffix":"2a4c1e1b-d2c9-4fe2-b38d-9b7d32feb4e0","filters":[{"code":"searchfacettype_age_level_2","value":"0-12 years"}]}
ElasticSearch Sink
For view purpose to display recently viewed items to end user. The ElasticSearch Sink allows to automatically create daily recently viewed items. The functionality can be used to display customer recently viewed items.
Let’s say you already have ES instance running at localhost/9310.
private ElasticSearchSink sink; sink = new ElasticSearchSink(); sink.setName("ElasticSearchSink-" + UUID.randomUUID()); channel = new MemoryChannel(); Map<String, String> channelParamters = new HashMap<>(); channelParamters.put("capacity", "100000"); channelParamters.put("transactionCapacity", "1000"); Context channelContext = new Context(channelParamters); Configurables.configure(channel, channelContext); channel.setName("ElasticSearchSinkChannel-" + UUID.randomUUID()); Map<String, String> paramters = new HashMap<>(); paramters.put(ElasticSearchSinkConstants.HOSTNAMES, "127.0.0.1:9310"); String indexNamePrefix = "recentlyviewed"; paramters.put(ElasticSearchSinkConstants.INDEX_NAME, indexNamePrefix); paramters.put(ElasticSearchSinkConstants.INDEX_TYPE, "clickevent"); paramters.put(ElasticSearchSinkConstants.CLUSTER_NAME, "jai-testclusterName"); paramters.put(ElasticSearchSinkConstants.BATCH_SIZE, "10"); paramters.put(ElasticSearchSinkConstants.SERIALIZER, ElasticSearchJsonBodyEventSerializer.class.getName()); Context sinkContext = new Context(paramters); sink.configure(sinkContext); sink.setChannel(channel); sink.start(); channel.start();
Check FlumeESSinkServiceImpl.java for details to start/stop the ElasticSearch sink.
Sample data in elasticsearch is stored as,
{timestamp=1399386809743, body={pageurl=http://jaibigdata.com/0, querystring=queryString59, pagenumber=3, hitsshown=9, hostedmachinename=192.168.182.1330, createdtimestampinmillis=1399386809743, sessionid=78286f6d-cc1e-489c-85ce-a7de8419d628, eventid=2a4c1e1b-d2c9-4fe2-b38d-9b7d32feb4e0-1399386809743-61, totalhits=32, clickeddocid=null, customerid=61, sortorder=asc, favourite=null, eventidsuffix=2a4c1e1b-d2c9-4fe2-b38d-9b7d32feb4e0, filters=[{value=0-12 years, code=searchfacettype_age_level_2}]}, eventId=2a4c1e1b-d2c9-4fe2-b38d-9b7d32feb4e0} {timestamp=1399386809757, body={pageurl=http://jaibigdata.com/1, querystring=null, pagenumber=1, hitsshown=34, hostedmachinename=192.168.182.1330, createdtimestampinmillis=1399386809757, sessionid=e6a3fd51-fe07-4e21-8574-ce5ab8bfbd68, eventid=fe5279b7-0bce-4e2b-ad15-8b94107aa792-1399386809757-134, totalhits=9, clickeddocid=22, customerid=134, sortorder=desc, favourite=null, eventidsuffix=fe5279b7-0bce-4e2b-ad15-8b94107aa792, filters=[{value=Blue, code=searchfacettype_color_level_2}]}, State=VIEWED, eventId=fe5279b7-0bce-4e2b-ad15-8b94107aa792} {timestamp=1399386809765, body={pageurl=http://jaibigdata.com/0, querystring=null, pagenumber=4, hitsshown=2, hostedmachinename=192.168.182.1331, createdtimestampinmillis=1399386809765, sessionid=29864de8-5708-40ab-a78b-4fae55698b01, eventid=886e9a28-4c8c-4e8c-a866-e86f685ecc54-1399386809765-317, totalhits=2, clickeddocid=null, customerid=317, sortorder=asc, favourite=null, eventidsuffix=886e9a28-4c8c-4e8c-a866-e86f685ecc54, filters=[{value=0-12 years, code=searchfacettype_age_level_2}, {value=0.0 - 10.0, code=product_price_range}]}, eventId=886e9a28-4c8c-4e8c-a866-e86f685ecc54}
ElasticSearchJsonBodyEventSerializer
To control how the data will be indexed in the ElasticSearch. Update event searializer as per your strategy to see how data should be indexed.
public class ElasticSearchJsonBodyEventSerializer implements ElasticSearchEventSerializer { @Override public BytesStream getContentBuilder(final Event event) throws IOException { final XContentBuilder builder = jsonBuilder().startObject(); appendBody(builder, event); appendHeaders(builder, event); return builder; } ... ... }
Check ElasticSearchJsonBodyEventSerializer.java to configure the serializer to index data.
Let’s take java example to create Flume source to process the above SearchQueryInstruction in test cases and store the data.
Avro Source with channel selector
For testing purpose, let’s create the Avro source to redirect data to relevant sinks based on flume multiplexing feature.
//Avro source to start at below port and process incoming data. private AvroSource avroSource; final Map<String, String> properties = new HashMap<String, String>(); properties.put("type", "avro"); properties.put("bind", "localhost"); properties.put("port", "44444"); avroSource = new AvroSource(); avroSource.setName("AvroSource-" + UUID.randomUUID()); Context sourceContext = new Context(properties); avroSource.configure(sourceContext); ChannelSelector selector = new MultiplexingChannelSelector(); //Channels from above services Channel ESChannel = flumeESSinkService.getChannel(); Channel HDFSChannel = flumeHDFSSinkService.getChannel(); List<Channel> channels = new ArrayList<>(); channels.add(ESChannel); channels.add(HDFSChannel); selector.setChannels(channels); final Map<String, String> selectorProperties = new HashMap<String, String>(); selectorProperties.put("type", "multiplexing"); selectorProperties.put("header", "State"); selectorProperties.put("mapping.VIEWED", HDFSChannel.getName() + " " + ESChannel.getName()); selectorProperties.put("mapping.FAVOURITE", HDFSChannel.getName() + " " + ESChannel.getName()); selectorProperties.put("default", HDFSChannel.getName()); Context selectorContext = new Context(selectorProperties); selector.configure(selectorContext); ChannelProcessor cp = new ChannelProcessor(selector); avroSource.setChannelProcessor(cp); avroSource.start();
Check FlumeAgentServiceImpl.java to directly store data to above configured sinks or even to log all data to a log file.
Standalone Flume/Hadoop/ElasticSearch environment
The application can be used to generate SearchQueryInstruction data and you can use your own standalone environment to process data further. In case you already have running Flume/Hadoop/ElasticSearch environment, use below settings to process the data further.
The following configuration (flume.conf) can also be used if you already have Flume instance running,
# Name the components on this agent searcheventscollectoragent.sources = eventsavrosource searcheventscollectoragent.sinks = hdfssink essink searcheventscollectoragent.channels = hdfschannel eschannel # Bind the source and sink to the channel searcheventscollectoragent.sources.eventsavrosource.channels = hdfschannel eschannel searcheventscollectoragent.sinks.hdfssink.channel = hdfschannel searcheventscollectoragent.sinks.essink.channel = eschannel #Avro source. This is where data will send data to. searcheventscollectoragent.sources.eventsavrosource.type = avro searcheventscollectoragent.sources.eventsavrosource.bind = 0.0.0.0 searcheventscollectoragent.sources.eventsavrosource.port = 44444 searcheventscollectoragent.sources.eventsavrosource.selector.type = multiplexing searcheventscollectoragent.sources.eventsavrosource.selector.header = State searcheventscollectoragent.sources.eventsavrosource.selector.mapping.VIEWED = hdfschannel eschannel searcheventscollectoragent.sources.eventsavrosource.selector.mapping.default = hdfschannel # Use a channel which buffers events in memory. This will keep all incoming stuff in memory. You may change this to file etc. in case of too much data coming and memory an issue. searcheventscollectoragent.channels.hdfschannel.type = memory searcheventscollectoragent.channels.hdfschannel.capacity = 100000 searcheventscollectoragent.channels.hdfschannel.transactionCapacity = 1000 searcheventscollectoragent.channels.eschannel.type = memory searcheventscollectoragent.channels.eschannel.capacity = 100000 searcheventscollectoragent.channels.eschannel.transactionCapacity = 1000 #HDFS sink. Store events directly to hadoop file system. searcheventscollectoragent.sinks.hdfssink.type = hdfs searcheventscollectoragent.sinks.hdfssink.hdfs.path = hdfs://localhost.localdomain:54321/searchevents/%Y/%m/%d/%H searcheventscollectoragent.sinks.hdfssink.hdfs.filePrefix = searchevents searcheventscollectoragent.sinks.hdfssink.hdfs.fileType = DataStream searcheventscollectoragent.sinks.hdfssink.hdfs.rollInterval = 0 searcheventscollectoragent.sinks.hdfssink.hdfs.rollSize = 134217728 searcheventscollectoragent.sinks.hdfssink.hdfs.idleTimeout = 60 searcheventscollectoragent.sinks.hdfssink.hdfs.rollCount = 0 searcheventscollectoragent.sinks.hdfssink.hdfs.batchSize = 10 searcheventscollectoragent.sinks.hdfssink.hdfs.useLocalTimeStamp = true #Elastic search searcheventscollectoragent.sinks.essink.type = elasticsearch searcheventscollectoragent.sinks.essink.hostNames = 127.0.0.1:9310 searcheventscollectoragent.sinks.essink.indexName = recentlyviewed searcheventscollectoragent.sinks.essink.indexType = clickevent searcheventscollectoragent.sinks.essink.clusterName = jai-testclusterName searcheventscollectoragent.sinks.essink.batchSize = 10 searcheventscollectoragent.sinks.essink.ttl = 5 searcheventscollectoragent.sinks.essink.serializer = org.jai.flume.sinks.elasticsearch.serializer.ElasticSearchJsonBodyEventSerializer
To test the application how the search query instructions behave on your existing hadoop instance, setup the hadoop and elasticsearch instances separately. The application uses Cloudera hadoop distribution 5.0 for testing purpose.
In later post we will cover to analyze the generated data further,
- Using Hive query the data for top customer queries and number of times a product viewed.
- Using ElasticSearch Hadoop to index customer top queries and product views data
- Using Pig to count total number of unique customers
- Using Oozie to schedule coordinated jobs for hive partition and bundle job to index data to ElasticSearch.
Reference: | Flume: Gathering customer product search clicks data using Apache Flume from our JCG partner Jaibeer Malik at the Jai’s Weblog blog. |