Kafka Connector Sink To Elasticsearch Example
Apache Kafka is a distributed streaming platform that excels at handling real-time data feeds. Elasticsearch is a distributed search and analytics engine designed for horizontal scalability and near real-time search. The Kafka Connect Elasticsearch Sink Connector bridges these two technologies, enabling seamless data flow from Kafka topics to Elasticsearch indices. Integrating Apache Kafka with Elasticsearch creates a powerful data pipeline that combines Kafka’s streaming capabilities with Elasticsearch’s search and analytics functionality. This article provides a guide on how to configure a Kafka connector sink to Elasticsearch, effectively connecting these systems using the Kafka Connect Elasticsearch Sink Connector.
1. Benefits of Using Kafka Connect
Kafka Connect is a powerful framework designed to stream data between Kafka and external systems such as databases, cloud storage, and search engines like Elasticsearch. It simplifies data movement without requiring complex custom code.
- Scalability and Fault Tolerance: Kafka Connect is highly scalable and fault-tolerant, allowing it to handle large-scale data ingestion seamlessly. It supports distributed mode, enabling multiple workers to share the load and recover from failures automatically.
- Pre-built Connectors: Kafka Connect provides a rich ecosystem of connectors, including the Elasticsearch Sink Connector, which reduces development effort and ensures seamless integration between Kafka and Elasticsearch.
- Schema Management and Evolution: With support for Schema Registry, Kafka Connect ensures that schema changes in Kafka topics are gracefully handled when transferring data to downstream systems like Elasticsearch.
- Minimal Configuration, Maximum Flexibility: Kafka Connect allows data ingestion with minimal configuration, using JSON-based settings to define source and sink connectors. At the same time, it offers flexibility through custom processing logic when needed.
- Exactly Once and At-Least Once Semantics: Kafka Connect provides exactly once and at-least once delivery guarantees, ensuring that no data is lost or duplicated during the transfer from Kafka to Elasticsearch.
2. Setting Up Kafka and Elasticsearch
To integrate Kafka with Elasticsearch, we need the following components:
- Apache Kafka – A distributed event streaming platform.
- Elasticsearch – A search and analytics engine.
- Kafka Connect Elasticsearch Sink Connector – A Kafka Connect plugin that pushes data from Kafka to Elasticsearch.
2.1 Running Kafka and Elasticsearch Using Docker
To set up Kafka, Zookeeper, Kafka Connect, Elasticsearch, and Kibana quickly, we use Docker Compose. The following docker-compose.yml
file defines the necessary services:
01 02 03 04 05 06 07 08 09 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 | version: '3' services: zookeeper: image: confluentinc/cp-zookeeper : latest environment: ZOOKEEPER_CLIENT_PORT: 2181 ports: - "2181:2181" kafka: image: confluentinc/cp-kafka : latest depends_on: - zookeeper ports: - "9092:9092" environment: KAFKA_BROKER_ID: 1 KAFKA_ZOOKEEPER_CONNECT: zookeeper : 2181 KAFKA_ADVERTISED_LISTENERS: PLAINTEXT : //kafka : 9092 KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1 elasticsearch: image: docker.elastic.co/elasticsearch/elasticsearch : 8.6.0 environment: discovery.type: single-node xpack.security.enabled: "false" ports: - "9200:9200" kafka-connect: image: confluentinc/cp-kafka-connect-base : 6.0.1 depends_on: - kafka environment: CONNECT_BOOTSTRAP_SERVERS: "kafka:9092" CONNECT_REST_ADVERTISED_HOST_NAME: kafka-connect CONNECT_REST_PORT: 8083 CONNECT_GROUP_ID: "connect-cluster" CONNECT_CONFIG_STORAGE_TOPIC: "connect-configs" CONNECT_OFFSET_STORAGE_TOPIC: "connect-offsets" CONNECT_STATUS_STORAGE_TOPIC: "connect-status" CONNECT_KEY_CONVERTER: "org.apache.kafka.connect.storage.StringConverter" CONNECT_VALUE_CONVERTER: "org.apache.kafka.connect.json.JsonConverter" CONNECT_VALUE_CONVERTER_SCHEMAS_ENABLE: "false" CONNECT_PLUGIN_PATH: /usr/share/java , /usr/share/confluent-hub-components , /data/connect-jars CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: 1 CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: 1 CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: 1 ports: - "8083:8083" |
This docker-compose.yml
file defines a streaming data pipeline that integrates Apache Kafka with Elasticsearch using Kafka Connect. It sets up four essential services: Zookeeper, Kafka, Elasticsearch, and Kafka Connect, each playing a key role in data ingestion and storage. Here’s a breakdown of each service:
2.1.1 Zookeeper
Manages Kafka’s cluster state and leader election. Runs on port 2181 using the Confluent Zookeeper image.
2.1.2 Kafka Broker
The Kafka broker manages message queues, distributes events, connects to Zookeeper for coordination (KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181
), listens on port 9092 for producer and consumer communication, and defines accessible endpoints using KAFKA_ADVERTISED_LISTENERS
.
2.1.3 Elasticsearch
Elasticsearch indexes Kafka messages for real-time search, operates in single-node mode (discovery.type=single-node
), exposes port 9200 for queries, and has security disabled (xpack.security.enabled=false
).
2.1.4 Kafka Connect
Kafka Connect streams data from Kafka to external systems like Elasticsearch, connects to Kafka (CONNECT_BOOTSTRAP_SERVERS="kafka:9092"
), stores configurations and offsets in Kafka topics, converts messages to JSON format for Elasticsearch, and exposes port 8083 for REST API management.
This setup fully integrates Kafka and Elasticsearch, allowing Kafka messages to be streamed directly into Elasticsearch.
Run the services with:
1 | docker compose up |
3. Configuring Kafka Connect Elasticsearch Sink
Kafka Connect is used to transfer data from Kafka to Elasticsearch. We need to configure the Elasticsearch Sink Connector.
3.1 Installing the Elasticsearch Connector
To integrate Kafka with Elasticsearch, we must install the Kafka Connect Elasticsearch Sink Connector inside the Kafka Connect container. This can be done using the following command:
1 2 | docker exec -it kafka-elasticsearch-streaming-pipeline-kafka-connect-1 bash -c \ "confluent-hub install --no-prompt confluentinc/kafka-connect-elasticsearch:latest" |
This command runs inside the Kafka Connect container, with -it
enabling interactive mode. The container name kafka-elasticsearch-streaming-pipeline-kafka-connect-1
specifies where the command runs. It installs the Elasticsearch Kafka Connector from Confluent Hub (confluentinc/kafka-connect-elasticsearch:latest
), using --no-prompt
for automated installation.
Output:
Why Is This Necessary?
Kafka Connect does not come with the Elasticsearch Sink Connector by default. This command installs the connector inside the Kafka Connect container, allowing it to send Kafka messages to Elasticsearch for indexing.
After installation, restart the Kafka Connect container to apply the changes.
1 | docker restart kafka-elasticsearch-streaming-pipeline-kafka-connect-1 |
Check Available Connectors in Kafka Connect
Run the following command to list all available connectors in Kafka Connect:
1 | curl -s http: //localhost :8083 /connector-plugins |
This will output a JSON response showing the available connectors. The response should include io.confluent.connect.elasticsearch.ElasticsearchSinkConnector
, as shown below.
3.2 Configuring the Elasticsearch Sink Connector
After installing the Elasticsearch Sink Connector, we need to configure it to define how Kafka messages are sent to Elasticsearch. This configuration specifies the Kafka topic to consume from, the Elasticsearch index to write to, and other essential parameters such as connection settings, data conversion, and error handling.
Below is the JSON configuration for setting up the Kafka Connect Elasticsearch Sink Connector. Create a configuration file elasticsearch-sink.json
:
01 02 03 04 05 06 07 08 09 10 11 12 | { "name" : "elastic-sink" , "config" : { "connector.class" : "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector" , "tasks.max" : "1" , "topics" : "logs" , "type.name" : "_doc" , "key.ignore" : "true" , "schema.ignore" : "true" } } |
This JSON file specifies essential parameters such as the Kafka topic to consume from, the target Elasticsearch index, connection details, data format, and error handling settings. It specifies the Kafka topic (logs
), the Elasticsearch endpoint (http://elasticsearch:9200
), and the connector class (io.confluent.connect.elasticsearch.ElasticsearchSinkConnector
). Key settings include key.ignore: true
and schema.ignore: true
, ensuring messages are written without enforcing a schema.
Deploy the connector
To apply the Elasticsearch Sink Connector configuration, we need to send it to the Kafka Connect REST API. This allows Kafka Connect to register and manage the connector. Deploy the connector with the following command:
1 | curl -X POST -H "Content-Type: application/json" --data @elasticsearch-sink.json http: //localhost :8083 /connectors |
Verify that the connector is running:
1 | curl http: //localhost :8083 /connectors/elastic-sink/status |
Output:
1 | {"name":"elastic-sink","connector":{"state":"RUNNING","worker_id":"kafka-connect:8083"},"tasks":[{"id":0,"state":"RUNNING","worker_id":"kafka-connect:8083"}],"type":"sink"} |
4. Producing Data to Kafka
Now that we have set up Kafka and Elasticsearch, the next step is to send data to Kafka, which will later be indexed in Elasticsearch using the Kafka Connect Sink. We will create our Kafka topic and produce sample messages that the Elasticsearch Sink Connector will consume.
1 | docker exec -it kafka-elasticsearch-streaming-pipeline-kafka-1 kafka-topics --create --topic logs --bootstrap-server kafka:9092 --partitions 1 --replication-factor 1 |
Once the topic is available, we can produce sample JSON messages to test the data flow from Kafka to Elasticsearch. Below is the command to produce sample JSON messages:
1 | docker exec -it kafka-elasticsearch-streaming-pipeline-kafka-1 kafka-console-producer --broker-list kafka:9092 --topic logs |
After running this command, you can enter JSON messages line by line, and each message will be published to the specified Kafka topic.
1 2 | { "timestamp" : "2025-03-09T12:00:00Z" , "level" : "INFO" , "message" : "Kafka to Elasticsearch integration working" } { "timestamp" : "2025-03-11T12:05:30Z" , "level" : "ERROR" , "message" : "Database connection timeout" , "service" : "auth-service" , "ip" : "192.168.1.101" } |
Press Enter after each message.
5. Querying Data in Elasticsearch
Once the data is successfully sent from Kafka to Elasticsearch, we can verify if it has been indexed correctly. Elasticsearch provides a RESTful API that allows us to query stored data. We can retrieve them using:
1 | curl -X GET "http://localhost:9200/logs/_search?pretty=true" |
You should now see the messages indexed under the logs
index in Elasticsearch.
6. Conclusion
In this article, we explored how to connect Apache Kafka with Elasticsearch using the Kafka Connect Sink. We set up Kafka and Elasticsearch using Docker, installed and configured the Elasticsearch Sink Connector, and demonstrated how to produce data to Kafka and query it in Elasticsearch. By leveraging Kafka Connect, we can efficiently stream real-time data from Kafka to Elasticsearch, enabling powerful search and analytics capabilities. This integration is ideal for use cases like log aggregation, monitoring, and real-time data visualization. With the setup in place, we can further optimize our configuration, secure our deployment, and scale our architecture based on our application needs.
7. Download the Source Code
This article covered the Java Kafka Connector Sink for integrating with ElasticSearch.
You can download the full source code of this example here: Java Kafka connector sink elasticsearch