DevOps

Consumer Seek in Kafka

Apache Kafka is a distributed event-streaming platform used to handle real-time data. One of its most powerful features is the ability to control the position of a consumer in a topic’s partition using the seek method. Let us delve into understanding Kafka consumer seek and its significance in managing message consumption from specific offsets.

1. Introduction

In Kafka, consumers process messages from partitions by maintaining an offset, which is the position of the next message to be read. Each message within a partition has a unique offset, allowing consumers to track their progress and ensure that messages are consumed in the correct order. By default, Kafka automatically manages offsets, committing them to a special internal Kafka topic for each consumer group. However, there are scenarios where automatic offset management may not meet your needs, and you might want to manually set or adjust the offset. This can be done using the seek() method in the Kafka Consumer API.

Manually seeking to a specific offset can be particularly useful in the following scenarios:

  • Reprocessing Old Messages: If you need to reprocess a specific set of messages from a past offset (e.g., after a consumer failure or to perform a specific task like backfilling data), you can use the seek() method to start consuming from that exact offset. This ensures that you don’t miss any important messages.
  • Skipping Over Certain Messages: In some cases, you may want to skip certain messages that are no longer relevant or have been processed already. For example, if messages in a particular range are deemed invalid, you can use seek() to start consuming from a later offset, effectively skipping over unwanted messages.
  • Debugging Specific Issues: When troubleshooting consumer behavior, you may need to inspect or reprocess messages starting from a specific offset. By manually seeking to a particular point in the partition, you can focus on the relevant messages that might help identify and resolve the issue.

The seek() method gives you greater control over your Kafka consumer’s behavior, enabling precise management of message consumption. This flexibility is invaluable when building systems that need to handle large volumes of data with fine-grained control over how messages are processed.

2. Kafka Setup on Docker

To experiment with Kafka, you can set up a Kafka cluster locally using Docker. Below is the docker-compose.yml file:

version: '3.8'
services:
  zookeeper:
    image: confluentinc/cp-zookeeper:7.4.0
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181
    ports:
      - "2181:2181"

  kafka:
    image: confluentinc/cp-kafka:7.4.0
    depends_on:
      - zookeeper
    ports:
      - "9092:9092"
    environment:
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1

To start the Kafka cluster use the following command:

docker-compose up -d

2.1 Create the Topic

Use the Kafka CLI to create a topic named test-topic. Run the following command in the Kafka container:

docker exec -it <kafka-container-name> kafka-topics --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic test-topic

This command does several things:

  • docker exec -it <kafka-container-name>: This part of the command executes a command inside a running Docker container. Replace <kafka-container-name> with the actual name of your Kafka container. The -it flag allows interactive input and output within the container.
  • kafka-topics --create: The kafka-topics tool is used to create, list, or manage Kafka topics. The --create flag specifically tells Kafka to create a new topic.
  • --bootstrap-server localhost:9092: This option specifies the address of the Kafka broker to connect to. In this case, it is localhost:9092, which is the default address when running Kafka locally on the default port.
  • --replication-factor 1: This flag sets the replication factor for the topic. A replication factor of 1 means there is only one copy of the data (i.e., no replication). This is typically used in local or development environments.
  • --partitions 1: This option sets the number of partitions for the topic. In this case, the topic will have 1 partition, which determines how Kafka divides data across brokers.
  • --topic test-topic: Finally, this specifies the name of the topic being created, which in this case is test-topic.

Together, this command creates a new Kafka topic named test-topic with 1 partition and a replication factor of 1, suitable for testing or local use.

2.1.1 Verify the Topic

To ensure that the topic was created successfully, list all available topics with the following command:

docker exec -it <kafka-container-name> kafka-topics --list --bootstrap-server localhost:9092

The command should display test-topic in the output.

2.2 Send Messages to the Topic

Use the Kafka producer CLI to send messages to the test-topic. Run the following command:

docker exec -it <kafka-container-name> kafka-console-producer --broker-list localhost:9092 --topic test-topic

After running this command, the terminal will wait for your input. Type the messages you want to send, one line at a time, and press Enter to publish them.

Hello Kafka!
Message 1
Message 2
Message 3

3. Seek Using Java API

The Kafka Consumer API in Java provides the seek method to manually set the offset. Here’s an example:

package com.jcg.example;

import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.TopicPartition;

import java.time.Duration;
import java.util.Collections;
import java.util.Properties;

public class KafkaConsumerSeekExample {
    public static void main(String[] args) {
        // Kafka consumer configuration
        Properties props = new Properties();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        props.put(ConsumerConfig.GROUP_ID_CONFIG, "seek-group");
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");

        KafkaConsumer<String, String> consumer = new KafkaConsumer(props);

        String topic = "test-topic";
        TopicPartition partition = new TopicPartition(topic, 0);

        try {
            // Assign a specific partition
            consumer.assign(Collections.singletonList(partition));

            // Seek to a specific offset
            consumer.seek(partition, 5);

            System.out.println("Consuming messages starting from offset 5...");

            while (true) {
                ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
                for (ConsumerRecord<String, String> record : records) {
                    System.out.printf("Consumed message - key: %s, value: %s, offset: %d%n",
                                      record.key(), record.value(), record.offset());
                }
            }
        } finally {
            consumer.close();
        }
    }
}

3.1 Code Explanation

The KafkaConsumerSeekExample class demonstrates how to manually seek to a specific offset in a Kafka topic using the Java Kafka Consumer API. This functionality is particularly useful when you want to reprocess messages or debug a specific range of offsets in a topic’s partition.

The program starts by setting up the Kafka consumer configuration. A Properties object is created and populated with essential settings:

  • ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG: Specifies the Kafka broker address (localhost:9092 in this case).
  • ConsumerConfig.GROUP_ID_CONFIG: Sets the consumer group ID as seek-group.
  • ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG and ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG: Define the deserialization classes for keys and values.
  • ConsumerConfig.AUTO_OFFSET_RESET_CONFIG: Ensures the consumer starts at the earliest offset if no previous offset exists for the group.

A KafkaConsumer object is created using the configured properties. The consumer is then assigned to a specific topic partition using the assign method. In this example, the topic is test-topic, and the partition is 0.

To control the starting point of consumption, the seek method is called, setting the offset to 5. This tells the consumer to start reading messages from the 5th offset in the assigned partition.

The program then enters an infinite loop where it continuously polls the Kafka broker for new messages using the poll method. The polling interval is set to 100 milliseconds using Duration.ofMillis(100). For each received message, the key, value, and offset are printed to the console using System.out.printf.

Finally, the consumer.close() method is invoked in the finally block to ensure the consumer is properly closed, releasing any resources it holds.

3.2 Code Output

The code when executed gives the following output:

Consuming messages starting from offset 5...
Consumed message - key: null, value: Message 5, offset: 5
Consumed message - key: null, value: Message 6, offset: 6
...

4. Conclusion

Kafka’s seek method is a powerful feature for controlling message consumption. This article demonstrated how to set up Kafka on Docker and use the Java Consumer API for seeking to specific offsets, which is especially helpful for debugging or reprocessing use cases.

Yatin Batra

An experience full-stack engineer well versed with Core Java, Spring/Springboot, MVC, Security, AOP, Frontend (Angular & React), and cloud technologies (such as AWS, GCP, Jenkins, Docker, K8).
Subscribe
Notify of
guest

This site uses Akismet to reduce spam. Learn how your comment data is processed.

0 Comments
Oldest
Newest Most Voted
Inline Feedbacks
View all comments
Back to top button