Enterprise Java

Manage Kafka Listeners Dynamically in Spring Boot

In contemporary event-driven architectures, proficient management of data streams is paramount. Apache Kafka stands out as a favored solution for this purpose. However, despite the assistance of frameworks like Spring Kafka, integrating it into our applications poses challenges. A significant hurdle lies in effectively implementing dynamic listener management. This aspect is crucial as it offers the necessary flexibility and control to adapt to the evolving workloads and maintenance requirements of our application. Let’s delve into understanding how to manage Kafka listeners dynamically in a Spring Boot application.

1. Introduction

Kafka is an open-source distributed streaming platform developed by LinkedIn and later donated to the Apache Software Foundation. It was designed to handle real-time data streams, making it a highly scalable, fault-tolerant, and distributed system for processing and storing large volumes of event data. Kafka is widely used for various use cases, such as log aggregation, event sourcing, messaging, and real-time analytics.

1.1 Key Concepts

  • Topics: Kafka organizes data streams into topics, which are similar to categories or feeds. Each topic consists of a stream of records or messages.
  • Producers: Producers are applications that publish data to Kafka topics. They write messages to specific topics, and these messages are then stored in the Kafka brokers.
  • Brokers: Kafka brokers are the nodes that form the Kafka cluster. They are responsible for receiving, storing, and serving messages. Each broker holds one or more partitions of a topic.
  • Partitions: Topics can be divided into multiple partitions, essentially ordered message logs. Partitions allow data to be distributed and processed in parallel across different brokers.
  • Consumers: Consumers are applications that read data from Kafka topics. They subscribe to one or more topics and receive messages from the partitions of those topics.
  • Consumer Groups: Consumers can be organized into consumer groups, where each group consists of one or more consumers. Each message in a partition is delivered to only one consumer within a group, allowing parallel processing of data.

2. Setting up Apache Kafka on Docker

Using Docker Compose simplifies the process by defining the services, their dependencies, and network configuration in a single file. It allows for easier management and scalability of the environment. Make sure you have Docker and Docker Compose installed on your system before proceeding with these steps. To set up Apache Kafka on Docker using Docker Compose, follow these steps.

2.1 Creating Docker Compose file

Create a file called docker-compose.yml and open it for editing.

  • Zookeeper Service: The zookeeper service is defined to run Zookeeper, which is a centralized service for maintaining configuration information, naming, providing distributed synchronization, and providing group services. The configuration for this service is as follows:
    • Image: The service uses the latest version of Confluent’s Zookeeper image (wurstmeister/zookeeper).
    • Container Name: The name of the container is set to zookeeper.
    • Ports: Zookeeper uses port 2181 for client connections, which are mapped from the host to the container.
  • Kafka Service: The kafka service is defined to run Apache Kafka, a distributed streaming platform. Here’s how the Kafka service is configured:
    • Image: The service uses the latest version of Confluent’s Kafka image (wurstmeister/kafka).
    • Container Name: The name of the container is set to kafka.
    • Ports: Kafka uses port 9092 for client connections, which are mapped from the host to the container.
    • Environment Variables: Several environment variables are set to configure Kafka. Notably, KAFKA_ZOOKEEPER_CONNECT specifies the Zookeeper connection string, KAFKA_ADVERTISED_LISTENERS defines the listener for client connections, and KAFKA_CREATE_TOPICS creates a topic named my-topic with 1 partition and a replication factor of 1.
    • Depends On: The Kafka service depends on the zookeeper service, ensuring Zookeeper is running before Kafka starts.

Add the following content to the file and save it once done.

docker-compose.yml

version: '3'
services:
  zookeeper:
    image: wurstmeister/zookeeper
    container_name: zookeeper
    ports:
      - "2181:2181"

  kafka:
    image: wurstmeister/kafka
    container_name: kafka
    ports:
      - "9092:9092"
    environment:
      KAFKA_ADVERTISED_LISTENERS: INSIDE://kafka:9092,OUTSIDE://localhost:9093
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: INSIDE:PLAINTEXT,OUTSIDE:PLAINTEXT
      KAFKA_LISTENERS: INSIDE://0.0.0.0:9092,OUTSIDE://0.0.0.0:9093
      KAFKA_INTER_BROKER_LISTENER_NAME: INSIDE
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_CREATE_TOPICS: "my-topic:1:1"
    depends_on:
      - zookeeper

2.2 Running the Kafka containers

Open a terminal or command prompt in the same directory as the docker-compose.yml file. Start the Kafka containers by running the following command:

Start containers

docker-compose up -d

This command will start the ZooKeeper and Kafka containers in detached mode, running in the background.

Kafka Spring boot Dynamically Manage Listeners-kafka-containers
Fig. 1: Kafka containers

To stop and remove the containers, as well as the network created, use the following command:

Stop containers

docker-compose down

2.3 Creating a Topic

After the Kafka cluster is operational, create the Kafka topic. Go to the directory containing the docker-compose.yml file and execute the following command. It will establish the my_topic topic utilizing a Kafka broker operating on port number 9092.

Kafka Topic

docker-compose exec kafka kafka-topics.sh --create --topic my_topic --partitions 1 --replication-factor 1 --bootstrap-server kafka:9092
Kafka Spring boot Dynamically Manage Listeners-creating-a-topic
Fig. 2: Creating a Topic

3. Configuring the Kafka Consumer

Configuring a Kafka consumer in a Spring Boot application involves setting various properties to define how the consumer behaves and connects to the Kafka cluster. These configurations include the bootstrap servers, group ID, key and value deserializers, and other consumer-specific settings.

3.1 Code Example

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.listener.config.ContainerProperties;

import java.util.HashMap;
import java.util.Map;

@EnableKafka
@Configuration
public class KafkaConsumerConfig {

    @Bean
    public ConsumerFactory<String, String> consumerFactory() {
        Map<String, Object> props = new HashMap();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        props.put(ConsumerConfig.GROUP_ID_CONFIG, "group_id");
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        return new DefaultKafkaConsumerFactory<>(props);
    }

    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, String> factory = 
            new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);
        return factory;
    }
}

The code defines:

  • @EnableKafka enables Kafka support in the Spring application.
  • The consumerFactory() method creates a ConsumerFactory bean that sets up the consumer properties, including bootstrap servers, group ID, and deserializers for keys and values.
  • The kafkaListenerContainerFactory() method creates a ConcurrentKafkaListenerContainerFactory bean, which is used to create Kafka listener containers.
  • setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE) configures the acknowledgment mode to manual, allowing for more control over message processing.

4. Configuring the Kafka Listener

A Kafka listener is configured to listen to specific topics and process messages from those topics. This can be done using the @KafkaListener annotation.

4.1 Code Example

import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.stereotype.Service;

@Service
public class KafkaConsumerService {

    @KafkaListener(topics = "my_topic", groupId = "group_id")
    public void listen(String message, Acknowledgment acknowledgment) {
        System.out.println("Received message: " + message);
        // Process the message
        acknowledgment.acknowledge(); // Manually acknowledge the message
    }
}

The code defines:

  • @KafkaListener specifies the topic to listen to and the consumer group ID.
  • The listen method is the message handler that processes incoming messages from the specified topic.
  • The Acknowledgment object is used to manually acknowledge the message, which is necessary when using manual acknowledgment mode.

5. Dynamically Controlling the Listener

In some scenarios, you may need to start or stop the Kafka listener dynamically. This can be achieved using the KafkaListenerEndpointRegistry.

5.1 Code Example

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.config.KafkaListenerEndpointRegistry;
import org.springframework.kafka.listener.MessageListenerContainer;
import org.springframework.stereotype.Service;

@Service
public class ListenerControlService {

    @Autowired
    private KafkaListenerEndpointRegistry kafkaListenerEndpointRegistry;

    public void stopListener() {
        MessageListenerContainer listenerContainer = 
            kafkaListenerEndpointRegistry.getListenerContainer("myListenerId");
        if (listenerContainer != null) {
            listenerContainer.stop();
        }
    }

    public void startListener() {
        MessageListenerContainer listenerContainer = 
            kafkaListenerEndpointRegistry.getListenerContainer("myListenerId");
        if (listenerContainer != null) {
            listenerContainer.start();
        }
    }
}

The code defines:

  • KafkaListenerEndpointRegistry is injected to manage Kafka listener containers.
  • The stopListener and startListener methods retrieve the listener container by ID and call the appropriate methods to stop or start it.
  • The listener ID should be set in the Kafka listener configuration to identify it uniquely.

6. Validate Dynamic Listener Controls

To ensure that the dynamic controls on the listener work as expected, you can write unit tests using the Spring Test framework.

6.1 Code Example

import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.kafka.config.KafkaListenerEndpointRegistry;
import org.springframework.kafka.listener.MessageListenerContainer;
import static org.junit.jupiter.api.Assertions.*;

@SpringBootTest
public class ListenerControlServiceTest {

    @Autowired
    private ListenerControlService listenerControlService;

    @Autowired
    private KafkaListenerEndpointRegistry kafkaListenerEndpointRegistry;

    @Test
    public void testStartStopListener() {
        MessageListenerContainer listenerContainer = 
            kafkaListenerEndpointRegistry.getListenerContainer("myListenerId");
        assertNotNull(listenerContainer);

        listenerControlService.stopListener();
        assertFalse(listenerContainer.isRunning());

        listenerControlService.startListener();
        assertTrue(listenerContainer.isRunning());
    }
}

The code defines:

  • @SpringBootTest is used to bootstrap the entire container for integration testing.
  • The test method testStartStopListener ensures the listener can be stopped and started, verifying its running state after each operation.
  • assertNotNull, assertFalse, and assertTrue are used to validate the listener’s state throughout the test.

7. Conclusion

In conclusion, effectively integrating Kafka with Spring Boot to dynamically manage listeners is crucial for building robust and flexible event-driven applications. By configuring Kafka consumers, setting up listeners, and leveraging dynamic control through the KafkaListenerEndpointRegistry, developers can ensure that their applications can adapt to varying workloads and maintenance needs. This approach not only enhances the resilience and scalability of the system but also provides the necessary control to handle real-time data processing efficiently. With the right implementation, dynamic listener management in Kafka Spring Boot applications can significantly improve overall performance and operational flexibility.

Yatin Batra

An experience full-stack engineer well versed with Core Java, Spring/Springboot, MVC, Security, AOP, Frontend (Angular & React), and cloud technologies (such as AWS, GCP, Jenkins, Docker, K8).
Subscribe
Notify of
guest

This site uses Akismet to reduce spam. Learn how your comment data is processed.

0 Comments
Oldest
Newest Most Voted
Inline Feedbacks
View all comments
Back to top button