Enterprise Java

Processing real-time data with Storm, Kafka and ElasticSearch – Part 2

This is the second part of the article series: Processing real-time data with Storm, Kafka, and ElasticSearch.

1. Introduction

In the first part we described the problem and how we are going to solve it. To refresh your memory, the plan is to create a Data Reduction System of historic flight data (which you can freely download from here). We will build our DRS using the following technology stack: Apache Storm → Apache Kafka → ElasticSearch → (Visualisation: Kibana, OpenLayers, OpenMap, etc.).

We already explained the basics of ElasticSearch and how it works in the previous article. In this article, we shall learn how to perform searches in ElasticSearch.

2. CRUD commands

You may be familiar with the acronym CRUD from the databases:

  • C – Create
  • R – Retrieve or Read
  • U – Update
  • D – Delete

The following table maps each CRUD command with its respective ElasticSearch HTTP/REST command. These commands apply to indices as well as documents.

CRUD commandHTTP/REST command
CreatePUT or POST
ReadGET
UpdatePUT or POST
DeleteDELETE
Table 1. CRUD vs ElasticSearch REST commands

So, let’s start Kibana as we learned in the previous article, and navigate to Kibana’s console.

2.1 Create Indices

To create an index flight issue the following:

PUT /flight
GET /_cluster/health

Notice that now the cluster’s health has changed from green to yellow. This occurs because we are running only a single instance of Elasticsearch. A single node cluster is fully functional, but data cannot be replicated to other nodes to provide resiliency. Replica shards must be available to different nodes for the cluster status to be green. If the cluster status is red, some data are unavailable.

To fix this, you need to create another installation of ElasticSearch (to the same, but preferably to another machine) and change node.name inside elasticsearch.yml; cluster.name must remain the same in both instances (default is elasticsearch). An alternative way is to pass configuration parameters to elastic search on the command line, e.g.

bin/elasticsearch -Enode.name=node-2 -Epath.data=./node-2/data -Epath.logs=./node-2/logs
GET /_cat/indices?v

health status index  uuid                   pri rep docs.count docs.deleted store.size pri.store.size
yellow open   flight w696u4y3SYWuGW--8VzW6Q   1   1          0            0       208b           208b

Our primary shard contains a replica shard but this is not allocated to any node.

GET /_cat/shards?v

index    shard prirep state      docs   store ip        node
flight       0      p STARTED       0    208b 127.0.0.1 MacBook-Pro.local
flight       0      r UNASSIGNED                        

You notice that the replica shard is unassigned (prirep = r means replica).

2.2 Create Documents

Let’s add some test data to our index:

PUT /flight/_doc/1 
{
  "Icao":"A0835D",
  "Alt":2400,
  "Lat":39.984322,
  "Long":-82.925616
}

or as a curl command:

curl -X PUT "localhost:9200/flight/_doc/1?pretty" -H 'Content-Type: application/json' -d'
{
  "Icao":"A0835D",
  "Alt":2400,
  "Lat":39.984322,
  "Long":-82.925616
}'

Content-Type is important for your query to succeed. We created a new flight with ID = 1. We could have also used POST instead of PUT, however, in that case, we cannot pass an ID; in that case, ElasticSearch will generate an automated ID for our document. Here is the result that ElasticSearch returns:

{
  "took" : 2,
  "timed_out" : false,
  "_shards" : {
    "total" : 1,
    "successful" : 1,
    "skipped" : 0,
    "failed" : 0
  },
  "hits" : {
    "total" : {
      "value" : 1,
      "relation" : "eq"
    },
    "max_score" : 1.0,
    "hits" : [
      {
        "_index" : "flight",
        "_type" : "_doc",
        "_id" : "1",
        "_score" : 1.0,
        "_source" : {
          "Icao" : "A0835D",
          "Alt" : 2400,
          "Lat" : 39.984322,
          "Long" : -82.925616
        }
      }
    ]
  }
}

The result document is inside the _source key.

2.3 Delete Documents

If you know the document’s index:

DELETE /flight/_doc/1

2.4 Delete indices

To delete an index, simply issue:

DELETE /flight

2.5 Import bulk data

Our scenario will be to process flight data. Ideally, these data would come in real-time from a number of sensors (radars), but since this is a bit difficult to achieve, we shall use batch flight history data that you can download from here. An explanation of the various fields can be found here.

The file needs to end with an empty line. If not, then add one. Inside the directory where you downloaded the batch files, issue the following command (per .json file):

curl -H "Content-Type: application/x-ndjson" -XPOST http://localhost:9200/flights/_bulk --data-binary "@2016-07-01-1300Z.json"

Please note that the content type is "application/x-ndjson" and not "application/x-json". Also, note that we denote the data to be binary in order to keep the newlines. The file name is 2016-07-01-1300Z.json. Any pre-existing documents with the same Ids will be replaced by the ones in the .json file.

If you run this command it will fail. It would be too easy if it worked, wouldn’t it? ElasticSearch expects its .json documents to have a specific format:

{"index":{"_id":4800770}}
{"Rcvr":1,"HasSig":false,"Icao":"494102", "Bad":false,"Reg":"CS-PHB", ...}
...

This means that you have to convert each downloaded .json file to the above format, which consists of the following steps:

  • Add a line that starts with "index" above each actual data document
  • Move the "Id":<value> to {"_id":<value>}

If you don’t want to spend your time manually modifying the .json documents, in the next article we will develop a Java program that parses them and inserts the documents in ElasticSearch using ElasticSearch’s REST API. Till then, you may download a sample .json file with only a few flights from here.

2.6 Search queries

ElasticSearch is all about searching. It allows you to use search queries that match some criteria.

GET /flight/_search?pretty
{ "query": {
     "match_all" : {
     }
   }
}

The above search query matches all documents of index flight. It can also be simplified like so:

GET /flight/_search 

The following query searches for documents that match the given Icao.

GET /flight/_search?pretty 
{ "query": {
     "match" : {
	  "Icao" : "A0835D"
     }
   }
}

It’s also possible to perform searches embedded in the request URL:

GET /flight/_search?q=Icao:A0835D 

This can also be written as:

GET /flight/_search?pretty 
{ "query": {
     "query_string": {
	  "query": "Icao:A0835D"
     }
   }
}

Instead of "match" and "query_string" one can also use "term". Use "term" for exact matches (like boolean, numbers, dates, enumerated values, keywords etc.).

GET /flight/_search?pretty 
{ "query": {
     "term": {
	  "Mil": true
     }
   }
}

You may also use "terms" to search an array of values.

You may search by "wildcard" and make use of the wildcards * and/or ? or by prefix.

GET /flight/_search?pretty 
{ "query": {
     "wildcard": {
	  "Call": "NJ*" 
     }
   }
}

Please note however that wildcard queries may be slow, especially if the wildcard is at the beginning of the search string.

You can also use regexp and provide a search string with regular expression special characters.

Or you can use "ids" to search an array of document IDs, or "range" to provide a range of values (using gt, gte, lt, lte for >, ≥, <, ≤ respectively).

GET /flight/_search?pretty 
{ "query": {
     "range": {
	  "Year": {
	    	"gte": "2010",
	    	"lt":  "2016"
        }
     }
   }
}

You can also use anchor dates, e.g. "gte": "2010/01/01||-1y" which means all dates up to one year before the anchor date 1 January 2010. More information can be found here.

To search for documents that have non-null values in a field:

GET /flight/_search?pretty 
{ 
   "query": {
     "exists": {
	  "field": "Cou"
     }
   }
}

You may also use "match_phrase" to match a phrase or "multi_match" to match many fields:

GET /flight/_search?pretty 
{ 
   "query": {
     "multi_match": {
        "query": false,
	  "fields": ["Mil", "Gnd"]
     }
   }
}

One can also build compound queries:

GET /flight/_search?pretty 
{ 
   "query": {
     "bool": {
        "must": [
 		{
		  "match": {
			"Icao": "A0835D"
		  }
		},
 		{
		  "range": {
			"Alt": {
			  "lte": 10000 
			}
		  }
		}
	  ]
     }
   }
}

We can improve the performance like so:

GET /flight/_search?pretty 
{ 
   "query": {
     "bool": {
        "must": [
 		{
		  "match": {
			"Icao": "A0835D"
		  }
		}
	  ],
        "filter": [
 		{
		  "range": {
			"Alt": {
			  "lte": 10000 
			}
		  }
		}
	  ]
     }
   }
}

Alternatives to must are: must_not, should.

But why does the second version improve the performance? The filter the object is for queries that either match or not. There is no notion of how well the documents match. So scoring how well the documents match that the altitude is ≤1000 doesn’t add any value in this case (when you use range, the result contributes to the score of how well the query matches). Additionally, filter caches the results which can improve performance for subsequent queries.

2.7 Update documents

To update a document that you know the ID of, use the _update API:

POST /flight/_update/4800770
{
  "doc": {
    "Mil": true
  }
}

With the above command we can add new fields to a document, too.

As a side-note, ElasticSearch documents are immutable! So, what ElasticSearch is doing under the hood when we request an update of a document, it retrieves the document, changes its fields and reindexes the document with the same ID, effectively replacing it.

More sophisticated queries can be sent using scripts, e.g.:

POST /flight/_update/4800770
{
  "script": {
    "source": "ctx._source.FlightsCount++"  
  }
}

ctx means context. There are many other ways to update documents, e.g. upserts (i.e. conditionally update or insert a document based on whether or not the document already exists).

POST /flight/_update/4800771
{
  "script": {
    "source": "ctx._source.FlightsCount++"  
  },
  "upsert": {
    "Rcvr":1,
    "HasSig":false,
    "Icao":"AE4839",
    ... 
  },
}

Since document with ID 4800771 doesn’t exist, the "upsert" is being used. You can also use the _update_by_query API:

POST /flight/_update_by_query
{
  "script": {
    "source": "ctx._source.FlightsCount++"  
  },
  "query": {
    "match_all": {}
  }
}

Keep in mind that if the query above fails, the query is aborted, not rolled back! The query is not run within a transaction. That means, that some documents may be updated and some not. This is why it is important to take a backup before you run update queries.

To replace documents, simply use the same ID as an existing document:

PUT /flight/_doc/4800770
{
   "Rcvr":1,
   "HasSig":false,
   "Icao":"494102",
   "Bad":false,
   ...
}

2.8 Delete documents

There is also a _delete_by_query API:

POST /flight/_delete_by_query
{
  "query": {
    "match_all": {}
  }
}

2.9 Bulk queries

The Bulk API helps us perform these actions on many documents with a single query. This API consists of 4 actions: index, create, update, delete.

POST /_bulk
{ "index": { "_index" : "flight", "_id": 10519389 } }
{ "Rcvr":1,"HasSig":true,"Sig":0,"Icao":"A0835D","Bad":false, ... }
{ "create": { "_index" : "flight", "_id": 4800770 } }
{"Rcvr":1,"HasSig":false,"Icao":"494102","Bad":false, ... }
{ "update": { "_index" : "flight", "_id": 4800770 } }
{ "doc": {"Mil": true } }
{ "delete": { "_index" : "flight", "_id": 4800770 } }

The difference between index and create actions is the following: if the document already exists, then create will throw an error while index will replace the document.

If the bulk query is to be run against the same index, then we can simplify the query like so:

POST /flight/_bulk
{ "index": { "_id": 10519389 } }
{ "Rcvr":1,"HasSig":true,"Sig":0,"Icao":"A0835D","Bad":false, ... }
{ "create": { "_id": 4800770 } }
{"Rcvr":1,"HasSig":false,"Icao":"494102","Bad":false, ... }
{ "update": { "_id": 4800770 } }
{ "doc": {"Mil": true } }
{ "delete": { "_id": 4800770 } }

A failed action won’t affect the other actions.

3. Mapping

But how does ElasticSearch know how to map the data? Dynamic mapping means that no mapping is defined explicitly, or at least not for some fields. This is done by inspecting the types of values for a document’s fields. If you are not satisfied with dynamic mapping, then you can have explicit mapping.

To view the mappings of data, type the following in Kibana:

GET /flight/_mapping

ElasticSearch performs date detection and checks if the field’s contents match any of the dynamic data formats that have been defined. By default, that will be the year, month, and day separated by slashes and an optional timestamp. If there is a match, the matching date format will be added to the mapping for the field. A field can have multiple mappings. E.g. a field of type String has two mappings:

  • as the "type" : "text",
  • a "fields" property containing a field named "keyword" with a type of "keyword".

E.g.

"Call" : {
     "type" : "text",
     "fields" : {
        "keyword" : {
          "type" : "keyword",
          "ignore_above" : 256
        }
     }
},

The difference between the two, is that the "text" type is used for full-text searches, and the "keyword" type for exact matches (filters), sorting, aggregations etc.

Every document that is stored within an Elasticsearch cluster has some meta-data associated with it, apart from the data fields that we specify when indexing documents, called meta-fields:

  • _index: contains the name of the index to which the document belongs.
  • _id: stores the ID of the document
  • _source: contains the original JSON object that was passed to Elasticsearch when indexing the document. The field is not indexed, and therefore you cannot search it, but you can retrieve it.
  • _field_names: contains the names of every field that contain a non-null value.
  • _routing: custom routing to route documents to shards based on a specified value: versioning of documents: can store custom data

ElasticSearch data types can be divided into four categories:

  • core: e.g. text, numeric such as byte (-128 to 127), short, integer, long, float, half_float, scaled_float (a float stored as a long), double, date, boolean, binary (with Base64 encoding), range (e.g. {"gte": 1, "lte":10} i.e. 1-10)
  • complex: object (JSON), array (flattens nested objects), nested (each object is indexed as a hidden document, ensuring that, each object is independent)
  • geo: geo_point, geo_shape (point, polygon, linestring, multipoint, multilinestring, multipolygon, geometrycollection, envelope, circle) represented as GeoJSON objects
  • specialized: ip, completion, attachment (requires the Ingest Attachment Processor plugin)

You can supply dates in three ways:

  • as a string (the date format can be configured)
  • as an integer representing the number of seconds since the epoch
  • as a long, representing the milliseconds since the epoch (this is how dates are being stored internally).

You can supply geo_points in one of four ways:

  • as an object with the "lat" and "lon" keys
  • as a string with the latitude and longitude separated by a comma, and in that order
  • as a geohash
  • as an array with values the longitude and latitude

To add a mapping:

PUT /flight/_mapping 
{
   "properties": {
     "location": {
       "type": "geo_point"
     }
   }
}

Be-careful that once a mapping for a field has been created it cannot be modified. The only way is to delete and re-create the index.

In the following example we manually create the various mappings disabling dynamic mapping.

PUT /flight/_mapping
{
    "dynamic": false,
    "properties": {
      "Rcvr": {
        "type": "integer"
      },
      "Icao": {
        "type": "text"
      },
      ...
      "location": {
        "type": "geo_point"
      }
   }
}

ElasticSearch’s mapping parameters can help create new mappings. E.g. to create a custom location field, we can use the copy_to parameter like so:

PUT /flight/_mapping
{
   "properties": {
      "Lat": {
         "type": "float",
         "copy_to": "location"
      },
      "Long": {
        "type": "float",
        "copy_to": "location"
      },
     "Location": {
       "type": "geo_point"
     }
   }
}

Another mapping parameter that will be most probably useful to you is format which follows the Joda Time formats (default "strict_date_optional_time||epoch_millis").

PUT /flight/_mapping 
{
   properties: {
     "Year": {
       "type": "date",
       "format": "year"
     }
   }
}

If you have a unix timestamp then you can just multiply it by 1000.

The built-in date formats can be found here.

You might be tempted to add a mapping like so:

"FSeen": {
       "type": "date",
       "format": "\/Date(epoch_millis)\/"
     }

to map "Fseen":"\/Date(1467378028852)\/" however this won’t work unfortunately. We shall see in the next articles how to process such formats.

If you update a mapping, issue the following query to update ElasticSearch if dynamic mapping is disabled:

POST /flight/_update_by_query?conflicts_proceed

4. Summary

In this article, we focused on how to use ElasticSearch for its main use, which is searching documents. In the next article we will learn how to import the batch .json files to ElasticSearch after transforming the .json files to the format that ElasticSearch’s bulk API wants, but also by using a JSON library to parse the .json files and inserting the documents to ElasticSearch using its REST API. Then, you can revisit this article and re-run the various search queries to evaluate the results.

5. References

  1. Andhavarapu A. (2017), Learning ElasticSearch, Packt.
  2. Dixit B. (2016), ElasticSearch Essentials, Packt.
  3. ElasticSearch tutorial
  4. Gormley C. & Tong Z. (2015), ElasticSearch The Definitive Guide, O’Reilly.
  5. JavaCodeGeeks, ElasticSearch Tutorial, mini book.
  6. Pranav S. & Sharath K. M. N. (2017), Learning Elastic Stack 6.0, Packt.
  7. Redko A. (2017), ElasticSearch Tutorial, JavaCodeGeeks.
  8. Srivastava A. & Azarmi B. (2019), Learning Kibana 7, 2nd Ed. Packt.
  9. Wong W. T. (2019), Advanced ElasticSearch 7.0, Packt.

5. Download the commands

That was the second part of the article series: Processing real-time data with Storm, Kafka and ElasticSearch.

Download
You can download the commands mentioned in this article here: Processing real-time data with Storm, Kafka and ElasticSearch – Part 2

Ioannis Kostaras

Software architect awarded the 2012 Duke's Choice Community Choice Award and co-organizing the hottest Java conference on earth, JCrete.
Subscribe
Notify of
guest

This site uses Akismet to reduce spam. Learn how your comment data is processed.

0 Comments
Oldest
Newest Most Voted
Inline Feedbacks
View all comments
Back to top button