Software Development

Kafka Connector Sink To Elasticsearch Example

Apache Kafka is a distributed streaming platform that excels at handling real-time data feeds. Elasticsearch is a distributed search and analytics engine designed for horizontal scalability and near real-time search. The Kafka Connect Elasticsearch Sink Connector bridges these two technologies, enabling seamless data flow from Kafka topics to Elasticsearch indices. Integrating Apache Kafka with Elasticsearch creates a powerful data pipeline that combines Kafka’s streaming capabilities with Elasticsearch’s search and analytics functionality. This article provides a guide on how to configure a Kafka connector sink to Elasticsearch, effectively connecting these systems using the Kafka Connect Elasticsearch Sink Connector.

1. Benefits of Using Kafka Connect

Kafka Connect is a powerful framework designed to stream data between Kafka and external systems such as databases, cloud storage, and search engines like Elasticsearch. It simplifies data movement without requiring complex custom code.

  • Scalability and Fault Tolerance: Kafka Connect is highly scalable and fault-tolerant, allowing it to handle large-scale data ingestion seamlessly. It supports distributed mode, enabling multiple workers to share the load and recover from failures automatically.
  • Pre-built Connectors: Kafka Connect provides a rich ecosystem of connectors, including the Elasticsearch Sink Connector, which reduces development effort and ensures seamless integration between Kafka and Elasticsearch.
  • Schema Management and Evolution: With support for Schema Registry, Kafka Connect ensures that schema changes in Kafka topics are gracefully handled when transferring data to downstream systems like Elasticsearch.
  • Minimal Configuration, Maximum Flexibility: Kafka Connect allows data ingestion with minimal configuration, using JSON-based settings to define source and sink connectors. At the same time, it offers flexibility through custom processing logic when needed.
  • Exactly Once and At-Least Once Semantics: Kafka Connect provides exactly once and at-least once delivery guarantees, ensuring that no data is lost or duplicated during the transfer from Kafka to Elasticsearch.

2. Setting Up Kafka and Elasticsearch

To integrate Kafka with Elasticsearch, we need the following components:

  • Apache Kafka – A distributed event streaming platform.
  • Elasticsearch – A search and analytics engine.
  • Kafka Connect Elasticsearch Sink Connector – A Kafka Connect plugin that pushes data from Kafka to Elasticsearch.

2.1 Running Kafka and Elasticsearch Using Docker

To set up Kafka, Zookeeper, Kafka Connect, Elasticsearch, and Kibana quickly, we use Docker Compose. The following docker-compose.yml file defines the necessary services:

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
version: '3'
services:
  zookeeper:
    image: confluentinc/cp-zookeeper:latest
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181
    ports:
      - "2181:2181"
       
  kafka:
    image: confluentinc/cp-kafka:latest
    depends_on:
      - zookeeper
    ports:
      - "9092:9092"
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
       
  elasticsearch:
    image: docker.elastic.co/elasticsearch/elasticsearch:8.6.0
    environment:
      discovery.type: single-node
      xpack.security.enabled: "false"
    ports:
      - "9200:9200"
 
  kafka-connect:
    image: confluentinc/cp-kafka-connect-base:6.0.1
    depends_on:
      - kafka
    environment:
      CONNECT_BOOTSTRAP_SERVERS: "kafka:9092"
      CONNECT_REST_ADVERTISED_HOST_NAME: kafka-connect
      CONNECT_REST_PORT: 8083
      CONNECT_GROUP_ID: "connect-cluster"
      CONNECT_CONFIG_STORAGE_TOPIC: "connect-configs"
      CONNECT_OFFSET_STORAGE_TOPIC: "connect-offsets"
      CONNECT_STATUS_STORAGE_TOPIC: "connect-status"
      CONNECT_KEY_CONVERTER: "org.apache.kafka.connect.storage.StringConverter"
      CONNECT_VALUE_CONVERTER: "org.apache.kafka.connect.json.JsonConverter"
      CONNECT_VALUE_CONVERTER_SCHEMAS_ENABLE: "false"
      CONNECT_PLUGIN_PATH: /usr/share/java,/usr/share/confluent-hub-components,/data/connect-jars
      CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: 1
      CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: 1
      CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: 1
    ports:
      - "8083:8083"

This docker-compose.yml file defines a streaming data pipeline that integrates Apache Kafka with Elasticsearch using Kafka Connect. It sets up four essential services: Zookeeper, Kafka, Elasticsearch, and Kafka Connect, each playing a key role in data ingestion and storage. Here’s a breakdown of each service:

2.1.1 Zookeeper

Manages Kafka’s cluster state and leader election. Runs on port 2181 using the Confluent Zookeeper image.

2.1.2 Kafka Broker

The Kafka broker manages message queues, distributes events, connects to Zookeeper for coordination (KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181), listens on port 9092 for producer and consumer communication, and defines accessible endpoints using KAFKA_ADVERTISED_LISTENERS.

2.1.3 Elasticsearch

Elasticsearch indexes Kafka messages for real-time search, operates in single-node mode (discovery.type=single-node), exposes port 9200 for queries, and has security disabled (xpack.security.enabled=false).

2.1.4 Kafka Connect

Kafka Connect streams data from Kafka to external systems like Elasticsearch, connects to Kafka (CONNECT_BOOTSTRAP_SERVERS="kafka:9092"), stores configurations and offsets in Kafka topics, converts messages to JSON format for Elasticsearch, and exposes port 8083 for REST API management.

This setup fully integrates Kafka and Elasticsearch, allowing Kafka messages to be streamed directly into Elasticsearch.

Run the services with:

1
docker compose up

3. Configuring Kafka Connect Elasticsearch Sink

Kafka Connect is used to transfer data from Kafka to Elasticsearch. We need to configure the Elasticsearch Sink Connector.

3.1 Installing the Elasticsearch Connector

To integrate Kafka with Elasticsearch, we must install the Kafka Connect Elasticsearch Sink Connector inside the Kafka Connect container. This can be done using the following command:

1
2
docker exec -it kafka-elasticsearch-streaming-pipeline-kafka-connect-1 bash -c \
  "confluent-hub install --no-prompt confluentinc/kafka-connect-elasticsearch:latest"

This command runs inside the Kafka Connect container, with -it enabling interactive mode. The container name kafka-elasticsearch-streaming-pipeline-kafka-connect-1 specifies where the command runs. It installs the Elasticsearch Kafka Connector from Confluent Hub (confluentinc/kafka-connect-elasticsearch:latest), using --no-prompt for automated installation.

Output:

java kafka connector sink elasticsearch installation output

Why Is This Necessary?

Kafka Connect does not come with the Elasticsearch Sink Connector by default. This command installs the connector inside the Kafka Connect container, allowing it to send Kafka messages to Elasticsearch for indexing.

After installation, restart the Kafka Connect container to apply the changes.

1
docker restart kafka-elasticsearch-streaming-pipeline-kafka-connect-1

Check Available Connectors in Kafka Connect

Run the following command to list all available connectors in Kafka Connect:

1
curl -s http://localhost:8083/connector-plugins

This will output a JSON response showing the available connectors. The response should include io.confluent.connect.elasticsearch.ElasticsearchSinkConnector, as shown below.

3.2 Configuring the Elasticsearch Sink Connector

After installing the Elasticsearch Sink Connector, we need to configure it to define how Kafka messages are sent to Elasticsearch. This configuration specifies the Kafka topic to consume from, the Elasticsearch index to write to, and other essential parameters such as connection settings, data conversion, and error handling.

Below is the JSON configuration for setting up the Kafka Connect Elasticsearch Sink Connector. Create a configuration file elasticsearch-sink.json:

01
02
03
04
05
06
07
08
09
10
11
12
{
  "name": "elastic-sink",
  "config": {
    "connector.class": "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector",
    "tasks.max": "1",
    "topics": "logs",
    "connection.url": "http://elasticsearch:9200",
    "type.name": "_doc",
    "key.ignore": "true",
    "schema.ignore": "true"
  }
}

This JSON file specifies essential parameters such as the Kafka topic to consume from, the target Elasticsearch index, connection details, data format, and error handling settings. It specifies the Kafka topic (logs), the Elasticsearch endpoint (http://elasticsearch:9200), and the connector class (io.confluent.connect.elasticsearch.ElasticsearchSinkConnector). Key settings include key.ignore: true and schema.ignore: true, ensuring messages are written without enforcing a schema.

Deploy the connector

To apply the Elasticsearch Sink Connector configuration, we need to send it to the Kafka Connect REST API. This allows Kafka Connect to register and manage the connector. Deploy the connector with the following command:

1
curl -X POST -H "Content-Type: application/json" --data @elasticsearch-sink.json http://localhost:8083/connectors

Verify that the connector is running:

1
curl http://localhost:8083/connectors/elastic-sink/status

Output:

1
{"name":"elastic-sink","connector":{"state":"RUNNING","worker_id":"kafka-connect:8083"},"tasks":[{"id":0,"state":"RUNNING","worker_id":"kafka-connect:8083"}],"type":"sink"}

4. Producing Data to Kafka

Now that we have set up Kafka and Elasticsearch, the next step is to send data to Kafka, which will later be indexed in Elasticsearch using the Kafka Connect Sink. We will create our Kafka topic and produce sample messages that the Elasticsearch Sink Connector will consume.

1
docker exec -it kafka-elasticsearch-streaming-pipeline-kafka-1 kafka-topics --create --topic logs --bootstrap-server kafka:9092 --partitions 1 --replication-factor 1

Once the topic is available, we can produce sample JSON messages to test the data flow from Kafka to Elasticsearch. Below is the command to produce sample JSON messages:

1
docker exec -it kafka-elasticsearch-streaming-pipeline-kafka-1 kafka-console-producer --broker-list kafka:9092 --topic logs

After running this command, you can enter JSON messages line by line, and each message will be published to the specified Kafka topic.

1
2
{"timestamp": "2025-03-09T12:00:00Z", "level": "INFO", "message": "Kafka to Elasticsearch integration working"}
{"timestamp":"2025-03-11T12:05:30Z","level":"ERROR","message":"Database connection timeout","service":"auth-service","ip":"192.168.1.101"}

Press Enter after each message.

5. Querying Data in Elasticsearch

Once the data is successfully sent from Kafka to Elasticsearch, we can verify if it has been indexed correctly. Elasticsearch provides a RESTful API that allows us to query stored data. We can retrieve them using:

You should now see the messages indexed under the logs index in Elasticsearch.

6. Conclusion

In this article, we explored how to connect Apache Kafka with Elasticsearch using the Kafka Connect Sink. We set up Kafka and Elasticsearch using Docker, installed and configured the Elasticsearch Sink Connector, and demonstrated how to produce data to Kafka and query it in Elasticsearch. By leveraging Kafka Connect, we can efficiently stream real-time data from Kafka to Elasticsearch, enabling powerful search and analytics capabilities. This integration is ideal for use cases like log aggregation, monitoring, and real-time data visualization. With the setup in place, we can further optimize our configuration, secure our deployment, and scale our architecture based on our application needs.

7. Download the Source Code

This article covered the Java Kafka Connector Sink for integrating with ElasticSearch.

Download
You can download the full source code of this example here: Java Kafka connector sink elasticsearch

Omozegie Aziegbe

Omos Aziegbe is a technical writer and web/application developer with a BSc in Computer Science and Software Engineering from the University of Bedfordshire. Specializing in Java enterprise applications with the Jakarta EE framework, Omos also works with HTML5, CSS, and JavaScript for web development. As a freelance web developer, Omos combines technical expertise with research and writing on topics such as software engineering, programming, web application development, computer science, and technology.
Subscribe
Notify of
guest


This site uses Akismet to reduce spam. Learn how your comment data is processed.

0 Comments
Oldest
Newest Most Voted
Inline Feedbacks
View all comments
Back to top button