Enterprise Java

Creating Kafka Consumers With Reactor Kafka

Reactor Kafka provides a reactive API that integrates Apache Kafka with Project Reactor, enabling developers to create non-blocking, back-pressure-aware Kafka consumers and producers. This reactive approach is particularly valuable for high-throughput systems where traditional blocking I/O operations might become a bottleneck. This article will explore how to create Kafka consumers using Reactor Kafka.

1. Setting Up Dependencies

First, set up a Spring Boot project and include the required dependencies. Our project will need Spring Kafka and Reactor Kafka.

1
2
3
4
5
6
7
8
9
<dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka</artifactId>
</dependency>
<!-- Reactor Kafka -->
<dependency>
    <groupId>io.projectreactor.kafka</groupId>
    <artifactId>reactor-kafka</artifactId>
</dependency>

2. Configuring the Kafka Consumer

Kafka consumers require bootstrap servers, group IDs, and deserializers. These properties are defined in application.yml.

The bootstrap-servers property specifies the Kafka broker addresses, while reactor-consumer-group defines the consumer group name. The auto-offset-reset option, set to earliest, ensures the consumer starts reading from the beginning of the topic if no previous offset exists. The key-deserializer and value-deserializer determine how Kafka deserializes message keys and values.

01
02
03
04
05
06
07
08
09
10
11
12
13
14
spring:
  kafka:
    bootstrap-servers: localhost:9092
    consumer:
      group-id: reactor-consumer-group
      auto-offset-reset: earliest
    producer:
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.apache.kafka.common.serialization.StringSerializer
    properties:
      spring:
        json:
          trusted:
            packages: '*'

3. Kafka Consumer Configuration

To consume messages reactively with Reactor Kafka, we need to configure a Kafka consumer with the appropriate properties. This includes specifying the bootstrap servers, consumer group ID, and deserialization strategy. Additionally, we will define a ReceiverOptions bean that holds these configurations and a ReactiveKafkaConsumerTemplate bean, which serves as the primary interface for consuming messages in a reactive manner.

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
@Configuration
public class KafkaConsumerConfig {
 
    @Bean
    public ReceiverOptions<String, String> receiverOptions() {
        Map<String, Object> props = new HashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        props.put(ConsumerConfig.GROUP_ID_CONFIG, "reactor-consumer-group");
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
 
        // Configure ReceiverOptions with topic subscription
        return ReceiverOptions.<String, String>create(props)
                .subscription(Collections.singleton("reactor-topic"));
    }
 
    @Bean
    public ReactiveKafkaConsumerTemplate<String, String> kafkaConsumerTemplate(
            ReceiverOptions<String, String> receiverOptions) {
        return new ReactiveKafkaConsumerTemplate<>(receiverOptions);
    }      
}

This configuration class sets up the Kafka consumer using Reactor Kafka. The receiverOptions() method defines consumer properties, including the Kafka broker address, consumer group ID, deserializers, and auto-offset reset policy, then subscribes to the "reactor-topic". The kafkaConsumerTemplate() method creates a ReactiveKafkaConsumerTemplate, which serves as the primary interface for consuming messages reactively.

4. Creating the Reactor Kafka Consumer

With the Kafka consumer configuration in place, we now implement the consumer service that will listen for messages from the specified topic. Using ReactiveKafkaConsumerTemplate, we can consume messages reactively, ensuring non-blocking processing. The service will subscribe to the Kafka topic, process incoming messages, and log them while handling potential errors. Since Kafka supports at-least-once delivery, we use auto-acknowledgment to ensure messages are properly marked as consumed.

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
@Service
public class ReactiveKafkaConsumerService {
 
    private static final Logger log = LoggerFactory.getLogger(ReactiveKafkaConsumerService.class);
 
    private final ReactiveKafkaConsumerTemplate<String, String> kafkaConsumerTemplate;
 
    public ReactiveKafkaConsumerService(ReactiveKafkaConsumerTemplate<String, String> kafkaConsumerTemplate) {
        this.kafkaConsumerTemplate = kafkaConsumerTemplate;
    }
 
    @PostConstruct
    public void startConsumer() {
        Flux<String> kafkaFlux = kafkaConsumerTemplate
                .receiveAutoAck()
                .map(consumerRecord -> {
                    log.info("Received message: {}", consumerRecord.value());
                    return consumerRecord.value();
                })
                .doOnError(e -> log.error("Error processing message", e));
 
        kafkaFlux.subscribe();
    }
}

This service class implements a reactive Kafka consumer using ReactiveKafkaConsumerTemplate. Upon initialization (@PostConstruct), it starts consuming messages from the subscribed Kafka topic. The receiveAutoAck() method ensures that messages are automatically acknowledged after being received. Each message is processed within a reactive pipeline, logged, and returned as a Flux<String>. Any errors encountered during processing are logged. This approach ensures non-blocking, scalable message consumption within the application.

5. Handling Backpressure in Kafka Consumer

Backpressure is essential in reactive systems to prevent overwhelming consumers when processing large volumes of messages. In Kafka, if messages are produced faster than they can be processed, the system may experience memory pressure or slow performance. Reactor provides built-in backpressure strategies that allow us to control the rate of message consumption, ensuring that messages are processed efficiently without overloading system resources.

1
2
3
4
5
6
7
Flux<String> kafkaFlux = kafkaConsumerTemplate
    .receive()
    .limitRate(10) // Limit processing to 10 messages at a time
    .map(consumerRecord -> {
        log.info("Processing message: {}", consumerRecord.value());
        return record.value();
    });

By fetching a limited number of messages at a time, we can keep the system running smoothly and prevent overload, making the consumer handle traffic spikes better.

6. Handling Errors in Reactor Kafka

Handling errors properly in a Kafka consumer is crucial to ensure smooth message processing and prevent unexpected failures. In a reactive system, errors can occur due to network issues, deserialization failures, or Kafka broker unavailability. Instead of stopping the consumer, we can implement strategies to log errors, retry failed messages, or skip problematic records. This helps maintain a resilient and fault-tolerant application that can continue processing messages even when issues arise.

6.1 Implement Retry Strategy

Here is an example solution that implements a retry strategy.

1
2
3
4
5
Flux<String> kafkaFlux = kafkaConsumerTemplate
    .receive()
    .retry(3) // Retry up to 3 times before failing
    .doOnError(e -> log.error("Error consuming Kafka messages", e))
    .subscribe();

This code consumes messages from Kafka and applies a retry strategy to handle failures. If an error occurs during message processing, the retry(3) method attempts to reprocess the message up to three times before failing. Any errors encountered are logged using doOnError(), ensuring visibility into issues without immediately stopping the consumer. This approach helps improve fault tolerance by allowing transient failures to be retried before considering the message unprocessable.

6.2 Retry Strategies for Fault Tolerance

In some cases, simply retrying a failed operation a fixed number of times may not be enough, especially when dealing with temporary network issues or Kafka broker delays. A more effective approach is to introduce delayed retries with exponential backoff, where each retry attempt is delayed progressively longer. This prevents an excessive load on the system and gives external dependencies time to recover.

1
2
3
4
5
Flux<String> kafkaFlux = kafkaConsumerTemplate
    .receive()
    .retryWhen(Retry.backoff(3, Duration.ofSeconds(2)))
    .doOnError(e -> log.error("Error with backoff", e))
    .subscribe();

This code consumes messages from Kafka and applies a retry strategy with exponential backoff. If an error occurs, retryWhen(Retry.backoff(3, Duration.ofSeconds(2))) retries the operation up to three times, introducing an initial 2-second delay between retries, which increases exponentially. This approach helps prevent overwhelming Kafka or external systems by spacing out retry attempts.

7. Conclusion

In this article, we explored how to create Kafka consumers using Reactor Kafka in a Spring Boot application. We started by configuring the Kafka consumer with ReceiverOptions and ReactiveKafkaConsumerTemplate, then implemented a reactive consumer service to process messages efficiently. We also covered important aspects such as handling backpressure, managing errors, and implementing retry strategies to ensure fault tolerance. By leveraging Project Reactor, our solution enables non-blocking, scalable, and resilient message consumption.

8. Download the Source Code

This article covered how to create consumers using Kafka Reactor.

Download
You can download the full source code of this example here: kafka reactor create consumers

Omozegie Aziegbe

Omos Aziegbe is a technical writer and web/application developer with a BSc in Computer Science and Software Engineering from the University of Bedfordshire. Specializing in Java enterprise applications with the Jakarta EE framework, Omos also works with HTML5, CSS, and JavaScript for web development. As a freelance web developer, Omos combines technical expertise with research and writing on topics such as software engineering, programming, web application development, computer science, and technology.
Subscribe
Notify of
guest


This site uses Akismet to reduce spam. Learn how your comment data is processed.

0 Comments
Oldest
Newest Most Voted
Inline Feedbacks
View all comments
Back to top button