Processing real-time data with Storm, Kafka and ElasticSearch – Part 2
This is the second part of the article series: Processing real-time data with Storm, Kafka, and ElasticSearch.
1. Introduction
In the first part we described the problem and how we are going to solve it. To refresh your memory, the plan is to create a Data Reduction System of historic flight data (which you can freely download from here). We will build our DRS using the following technology stack: Apache Storm → Apache Kafka → ElasticSearch → (Visualisation: Kibana, OpenLayers, OpenMap, etc.).
We already explained the basics of ElasticSearch and how it works in the previous article. In this article, we shall learn how to perform searches in ElasticSearch.
2. CRUD commands
You may be familiar with the acronym CRUD from the databases:
- C – Create
- R – Retrieve or Read
- U – Update
- D – Delete
The following table maps each CRUD command with its respective ElasticSearch HTTP/REST command. These commands apply to indices as well as documents.
CRUD command | HTTP/REST command |
Create | PUT or POST |
Read | GET |
Update | PUT or POST |
Delete | DELETE |
So, let’s start Kibana as we learned in the previous article, and navigate to Kibana’s console.
2.1 Create Indices
To create an index flight
issue the following:
PUT /flight GET /_cluster/health
Notice that now the cluster’s health has changed from green to yellow. This occurs because we are running only a single instance of Elasticsearch. A single node cluster is fully functional, but data cannot be replicated to other nodes to provide resiliency. Replica shards must be available to different nodes for the cluster status to be green. If the cluster status is red, some data are unavailable.
To fix this, you need to create another installation of ElasticSearch (to the same, but preferably to another machine) and change node.name
inside elasticsearch.yml
; cluster.name
must remain the same in both instances (default is elasticsearch
). An alternative way is to pass configuration parameters to elastic search on the command line, e.g.
bin/elasticsearch -Enode.name=node-2 -Epath.data=./node-2/data -Epath.logs=./node-2/logs
GET /_cat/indices?v health status index uuid pri rep docs.count docs.deleted store.size pri.store.size yellow open flight w696u4y3SYWuGW--8VzW6Q 1 1 0 0 208b 208b
Our primary shard contains a replica shard but this is not allocated to any node.
GET /_cat/shards?v index shard prirep state docs store ip node flight 0 p STARTED 0 208b 127.0.0.1 MacBook-Pro.local flight 0 r UNASSIGNED
You notice that the replica shard is unassigned (prirep = r
means replica).
2.2 Create Documents
Let’s add some test data to our index:
PUT /flight/_doc/1 { "Icao":"A0835D", "Alt":2400, "Lat":39.984322, "Long":-82.925616 }
or as a curl
command:
curl -X PUT "localhost:9200/flight/_doc/1?pretty" -H 'Content-Type: application/json' -d' { "Icao":"A0835D", "Alt":2400, "Lat":39.984322, "Long":-82.925616 }'
Content-Type
is important for your query to succeed. We created a new flight with ID = 1
. We could have also used POST
instead of PUT
, however, in that case, we cannot pass an ID; in that case, ElasticSearch will generate an automated ID for our document. Here is the result that ElasticSearch returns:
{ "took" : 2, "timed_out" : false, "_shards" : { "total" : 1, "successful" : 1, "skipped" : 0, "failed" : 0 }, "hits" : { "total" : { "value" : 1, "relation" : "eq" }, "max_score" : 1.0, "hits" : [ { "_index" : "flight", "_type" : "_doc", "_id" : "1", "_score" : 1.0, "_source" : { "Icao" : "A0835D", "Alt" : 2400, "Lat" : 39.984322, "Long" : -82.925616 } } ] } }
The result document is inside the _source
key.
2.3 Delete Documents
If you know the document’s index:
DELETE /flight/_doc/1
2.4 Delete indices
To delete an index, simply issue:
DELETE /flight
2.5 Import bulk data
Our scenario will be to process flight data. Ideally, these data would come in real-time from a number of sensors (radars), but since this is a bit difficult to achieve, we shall use batch flight history data that you can download from here. An explanation of the various fields can be found here.
The file needs to end with an empty line. If not, then add one. Inside the directory where you downloaded the batch files, issue the following command (per .json
file):
curl -H "Content-Type: application/x-ndjson" -XPOST http://localhost:9200/flights/_bulk --data-binary "@2016-07-01-1300Z.json"
Please note that the content type is "application/x-ndjson"
and not "application/x-json"
. Also, note that we denote the data to be binary in order to keep the newlines. The file name is 2016-07-01-1300Z.json
. Any pre-existing documents with the same Ids will be replaced by the ones in the .json
file.
If you run this command it will fail. It would be too easy if it worked, wouldn’t it? ElasticSearch expects its .json
documents to have a specific format:
{"index":{"_id":4800770}} {"Rcvr":1,"HasSig":false,"Icao":"494102", "Bad":false,"Reg":"CS-PHB", ...} ...
This means that you have to convert each downloaded .json
file to the above format, which consists of the following steps:
- Add a line that starts with
"index"
above each actual data document - Move the
"Id":<value
> to{"_id":<value>}
If you don’t want to spend your time manually modifying the .json
documents, in the next article we will develop a Java program that parses them and inserts the documents in ElasticSearch using ElasticSearch’s REST API. Till then, you may download a sample .json
file with only a few flights from here.
2.6 Search queries
ElasticSearch is all about searching. It allows you to use search queries that match some criteria.
GET /flight/_search?pretty { "query": { "match_all" : { } } }
The above search query matches all documents of index flight
. It can also be simplified like so:
GET /flight/_search
The following query searches for documents that match the given Icao.
GET /flight/_search?pretty { "query": { "match" : { "Icao" : "A0835D" } } }
It’s also possible to perform searches embedded in the request URL:
GET /flight/_search?q=Icao:A0835D
This can also be written as:
GET /flight/_search?pretty { "query": { "query_string": { "query": "Icao:A0835D" } } }
Instead of "match"
and "query_string"
one can also use "term"
. Use "term"
for exact matches (like boolean, numbers, dates, enumerated values, keywords etc.).
GET /flight/_search?pretty { "query": { "term": { "Mil": true } } }
You may also use "terms"
to search an array of values.
You may search by "wildcard"
and make use of the wildcards *
and/or ?
or by prefix
.
GET /flight/_search?pretty { "query": { "wildcard": { "Call": "NJ*" } } }
Please note however that wildcard queries may be slow, especially if the wildcard is at the beginning of the search string.
You can also use “regexp“ and provide a search string with regular expression special characters.
Or you can use "ids"
to search an array of document IDs, or "range"
to provide a range of values (using gt
, gte
, lt
, lte
for >, ≥, <, ≤
respectively).
GET /flight/_search?pretty { "query": { "range": { "Year": { "gte": "2010", "lt": "2016" } } } }
You can also use anchor dates, e.g. "gte": "2010/01/01||-1y"
which means all dates up to one year before the anchor date 1 January 2010. More information can be found here.
To search for documents that have non-null values in a field:
GET /flight/_search?pretty { "query": { "exists": { "field": "Cou" } } }
You may also use "match_phrase"
to match a phrase or "multi_match"
to match many fields:
GET /flight/_search?pretty { "query": { "multi_match": { "query": false, "fields": ["Mil", "Gnd"] } } }
One can also build compound queries:
GET /flight/_search?pretty { "query": { "bool": { "must": [ { "match": { "Icao": "A0835D" } }, { "range": { "Alt": { "lte": 10000 } } } ] } } }
We can improve the performance like so:
GET /flight/_search?pretty { "query": { "bool": { "must": [ { "match": { "Icao": "A0835D" } } ], "filter": [ { "range": { "Alt": { "lte": 10000 } } } ] } } }
Alternatives to must
are: must_not
, should
.
But why does the second version improve the performance? The filter
the object is for queries that either match or not. There is no notion of how well the documents match. So scoring how well the documents match that the altitude is ≤1000 doesn’t add any value in this case (when you use range, the result contributes to the score of how well the query matches). Additionally, filter
caches the results which can improve performance for subsequent queries.
2.7 Update documents
To update a document that you know the ID of, use the _update
API:
POST /flight/_update/4800770 { "doc": { "Mil": true } }
With the above command we can add new fields to a document, too.
As a side-note, ElasticSearch documents are immutable! So, what ElasticSearch is doing under the hood when we request an update of a document, it retrieves the document, changes its fields and reindexes the document with the same ID, effectively replacing it.
More sophisticated queries can be sent using scripts, e.g.:
POST /flight/_update/4800770 { "script": { "source": "ctx._source.FlightsCount++" } }
ctx
means context. There are many other ways to update documents, e.g. upserts (i.e. conditionally update or insert a document based on whether or not the document already exists).
POST /flight/_update/4800771 { "script": { "source": "ctx._source.FlightsCount++" }, "upsert": { "Rcvr":1, "HasSig":false, "Icao":"AE4839", ... }, }
Since document with ID 4800771
doesn’t exist, the "upsert"
is being used. You can also use the _update_by_query
API:
POST /flight/_update_by_query { "script": { "source": "ctx._source.FlightsCount++" }, "query": { "match_all": {} } }
Keep in mind that if the query above fails, the query is aborted, not rolled back! The query is not run within a transaction. That means, that some documents may be updated and some not. This is why it is important to take a backup before you run update queries.
To replace documents, simply use the same ID as an existing document:
PUT /flight/_doc/4800770 { "Rcvr":1, "HasSig":false, "Icao":"494102", "Bad":false, ... }
2.8 Delete documents
There is also a _delete_by_query
API:
POST /flight/_delete_by_query { "query": { "match_all": {} } }
2.9 Bulk queries
The Bulk API helps us perform these actions on many documents with a single query. This API consists of 4 actions: index, create, update, delete.
POST /_bulk { "index": { "_index" : "flight", "_id": 10519389 } } { "Rcvr":1,"HasSig":true,"Sig":0,"Icao":"A0835D","Bad":false, ... } { "create": { "_index" : "flight", "_id": 4800770 } } {"Rcvr":1,"HasSig":false,"Icao":"494102","Bad":false, ... } { "update": { "_index" : "flight", "_id": 4800770 } } { "doc": {"Mil": true } } { "delete": { "_index" : "flight", "_id": 4800770 } }
The difference between index and create actions is the following: if the document already exists, then create will throw an error while index will replace the document.
If the bulk query is to be run against the same index, then we can simplify the query like so:
POST /flight/_bulk { "index": { "_id": 10519389 } } { "Rcvr":1,"HasSig":true,"Sig":0,"Icao":"A0835D","Bad":false, ... } { "create": { "_id": 4800770 } } {"Rcvr":1,"HasSig":false,"Icao":"494102","Bad":false, ... } { "update": { "_id": 4800770 } } { "doc": {"Mil": true } } { "delete": { "_id": 4800770 } }
A failed action won’t affect the other actions.
3. Mapping
But how does ElasticSearch know how to map the data? Dynamic mapping means that no mapping is defined explicitly, or at least not for some fields. This is done by inspecting the types of values for a document’s fields. If you are not satisfied with dynamic mapping, then you can have explicit mapping.
To view the mappings of data, type the following in Kibana:
GET /flight/_mapping
ElasticSearch performs date detection and checks if the field’s contents match any of the dynamic data formats that have been defined. By default, that will be the year, month, and day separated by slashes and an optional timestamp. If there is a match, the matching date format will be added to the mapping for the field. A field can have multiple mappings. E.g. a field of type String
has two mappings:
- as the
"type" : "text"
, - a
"fields"
property containing a field named"keyword"
with a type of"keyword"
.
E.g.
"Call" : { "type" : "text", "fields" : { "keyword" : { "type" : "keyword", "ignore_above" : 256 } } },
The difference between the two, is that the "text"
type is used for full-text searches, and the "keyword"
type for exact matches (filters), sorting, aggregations etc.
Every document that is stored within an Elasticsearch cluster has some meta-data associated with it, apart from the data fields that we specify when indexing documents, called meta-fields:
_index
: contains the name of the index to which the document belongs._id
: stores the ID of the document_source
: contains the original JSON object that was passed to Elasticsearch when indexing the document. The field is not indexed, and therefore you cannot search it, but you can retrieve it._field_names
: contains the names of every field that contain a non-null value._routing
: custom routing to route documents to shards based on a specified value: versioning of documents: can store custom data
ElasticSearch data types can be divided into four categories:
- core: e.g.
text
, numeric such asbyte
(-128 to 127),short
,integer
,long
,float
,half_float
,scaled_float
(afloat
stored as along
),double
,date
,boolean
,binary
(with Base64 encoding),range
(e.g.{"gte": 1, "lte":10}
i.e.1-10
) - complex:
object
(JSON),array
(flattens nested objects),nested
(each object is indexed as a hidden document, ensuring that, each object is independent) - geo:
geo_point
,geo_shape
(point
,polygon
,linestring
,multipoint
,multilinestring
,multipolygon
,geometrycollection
,envelope
,circle
) represented asGeoJSON
objects - specialized:
ip
,completion
,attachment
(requires the Ingest Attachment Processor plugin)
You can supply date
s in three ways:
- as a
string
(the date format can be configured) - as an
integer
representing the number of seconds since theepoch
- as a
long
, representing the milliseconds since theepoch
(this is how dates are being stored internally).
You can supply geo_point
s in one of four ways:
- as an object with the
"lat"
and"lon"
keys - as a
string
with the latitude and longitude separated by a comma, and in that order - as a
geohash
- as an
array
with values the longitude and latitude
To add a mapping:
PUT /flight/_mapping { "properties": { "location": { "type": "geo_point" } } }
Be-careful that once a mapping for a field has been created it cannot be modified. The only way is to delete and re-create the index.
In the following example we manually create the various mappings disabling dynamic mapping.
PUT /flight/_mapping { "dynamic": false, "properties": { "Rcvr": { "type": "integer" }, "Icao": { "type": "text" }, ... "location": { "type": "geo_point" } } }
ElasticSearch’s mapping parameters can help create new mappings. E.g. to create a custom location
field, we can use the copy_to
parameter like so:
PUT /flight/_mapping { "properties": { "Lat": { "type": "float", "copy_to": "location" }, "Long": { "type": "float", "copy_to": "location" }, "Location": { "type": "geo_point" } } }
Another mapping parameter that will be most probably useful to you is format
which follows the Joda Time formats (default "strict_date_optional_time||epoch_millis"
).
PUT /flight/_mapping { properties: { "Year": { "type": "date", "format": "year" } } }
If you have a unix timestamp then you can just multiply it by 1000.
The built-in date formats can be found here.
You might be tempted to add a mapping like so:
"FSeen": { "type": "date", "format": "\/Date(epoch_millis)\/" }
to map "Fseen":"\/Date(1467378028852)\/"
however this won’t work unfortunately. We shall see in the next articles how to process such formats.
If you update a mapping, issue the following query to update ElasticSearch if dynamic mapping is disabled:
POST /flight/_update_by_query?conflicts_proceed
4. Summary
In this article, we focused on how to use ElasticSearch for its main use, which is searching documents. In the next article we will learn how to import the batch .json
files to ElasticSearch after transforming the .json
files to the format that ElasticSearch’s bulk API wants, but also by using a JSON library to parse the .json
files and inserting the documents to ElasticSearch using its REST API. Then, you can revisit this article and re-run the various search queries to evaluate the results.
5. References
- Andhavarapu A. (2017), Learning ElasticSearch, Packt.
- Dixit B. (2016), ElasticSearch Essentials, Packt.
- ElasticSearch tutorial
- Gormley C. & Tong Z. (2015), ElasticSearch The Definitive Guide, O’Reilly.
- JavaCodeGeeks, ElasticSearch Tutorial, mini book.
- Pranav S. & Sharath K. M. N. (2017), Learning Elastic Stack 6.0, Packt.
- Redko A. (2017), ElasticSearch Tutorial, JavaCodeGeeks.
- Srivastava A. & Azarmi B. (2019), Learning Kibana 7, 2nd Ed. Packt.
- Wong W. T. (2019), Advanced ElasticSearch 7.0, Packt.
5. Download the commands
That was the second part of the article series: Processing real-time data with Storm, Kafka and ElasticSearch.
You can download the commands mentioned in this article here: Processing real-time data with Storm, Kafka and ElasticSearch – Part 2