Creating Kafka Consumers With Reactor Kafka
Reactor Kafka provides a reactive API that integrates Apache Kafka with Project Reactor, enabling developers to create non-blocking, back-pressure-aware Kafka consumers and producers. This reactive approach is particularly valuable for high-throughput systems where traditional blocking I/O operations might become a bottleneck. This article will explore how to create Kafka consumers using Reactor Kafka.
1. Setting Up Dependencies
First, set up a Spring Boot project and include the required dependencies. Our project will need Spring Kafka and Reactor Kafka.
1 2 3 4 5 6 7 8 9 | < dependency > < groupId >org.springframework.kafka</ groupId > < artifactId >spring-kafka</ artifactId > </ dependency > <!-- Reactor Kafka --> < dependency > < groupId >io.projectreactor.kafka</ groupId > < artifactId >reactor-kafka</ artifactId > </ dependency > |
2. Configuring the Kafka Consumer
Kafka consumers require bootstrap servers, group IDs, and deserializers. These properties are defined in application.yml
.
The bootstrap-servers
property specifies the Kafka broker addresses, while reactor-consumer-group
defines the consumer group name. The auto-offset-reset
option, set to earliest
, ensures the consumer starts reading from the beginning of the topic if no previous offset exists. The key-deserializer
and value-deserializer
determine how Kafka deserializes message keys and values.
01 02 03 04 05 06 07 08 09 10 11 12 13 14 | spring: kafka: bootstrap-servers: localhost : 9092 consumer: group-id: reactor-consumer-group auto-offset-reset: earliest producer: key-serializer: org.apache.kafka.common.serialization.StringSerializer value-serializer: org.apache.kafka.common.serialization.StringSerializer properties: spring: json: trusted: packages: '*' |
3. Kafka Consumer Configuration
To consume messages reactively with Reactor Kafka, we need to configure a Kafka consumer with the appropriate properties. This includes specifying the bootstrap servers, consumer group ID, and deserialization strategy. Additionally, we will define a ReceiverOptions
bean that holds these configurations and a ReactiveKafkaConsumerTemplate
bean, which serves as the primary interface for consuming messages in a reactive manner.
01 02 03 04 05 06 07 08 09 10 11 12 13 14 15 16 17 18 19 20 21 22 23 | @Configuration public class KafkaConsumerConfig { @Bean public ReceiverOptions<String, String> receiverOptions() { Map<String, Object> props = new HashMap<>(); props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092" ); props.put(ConsumerConfig.GROUP_ID_CONFIG, "reactor-consumer-group" ); props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer. class ); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer. class ); props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest" ); // Configure ReceiverOptions with topic subscription return ReceiverOptions.<String, String>create(props) .subscription(Collections.singleton( "reactor-topic" )); } @Bean public ReactiveKafkaConsumerTemplate<String, String> kafkaConsumerTemplate( ReceiverOptions<String, String> receiverOptions) { return new ReactiveKafkaConsumerTemplate<>(receiverOptions); } } |
This configuration class sets up the Kafka consumer using Reactor Kafka. The receiverOptions()
method defines consumer properties, including the Kafka broker address, consumer group ID, deserializers, and auto-offset reset policy, then subscribes to the "reactor-topic"
. The kafkaConsumerTemplate()
method creates a ReactiveKafkaConsumerTemplate
, which serves as the primary interface for consuming messages reactively.
4. Creating the Reactor Kafka Consumer
With the Kafka consumer configuration in place, we now implement the consumer service that will listen for messages from the specified topic. Using ReactiveKafkaConsumerTemplate
, we can consume messages reactively, ensuring non-blocking processing. The service will subscribe to the Kafka topic, process incoming messages, and log them while handling potential errors. Since Kafka supports at-least-once delivery, we use auto-acknowledgment to ensure messages are properly marked as consumed.
01 02 03 04 05 06 07 08 09 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 | @Service public class ReactiveKafkaConsumerService { private static final Logger log = LoggerFactory.getLogger(ReactiveKafkaConsumerService. class ); private final ReactiveKafkaConsumerTemplate<String, String> kafkaConsumerTemplate; public ReactiveKafkaConsumerService(ReactiveKafkaConsumerTemplate<String, String> kafkaConsumerTemplate) { this .kafkaConsumerTemplate = kafkaConsumerTemplate; } @PostConstruct public void startConsumer() { Flux<String> kafkaFlux = kafkaConsumerTemplate .receiveAutoAck() .map(consumerRecord -> { log.info( "Received message: {}" , consumerRecord.value()); return consumerRecord.value(); }) .doOnError(e -> log.error( "Error processing message" , e)); kafkaFlux.subscribe(); } } |
This service class implements a reactive Kafka consumer using ReactiveKafkaConsumerTemplate
. Upon initialization (@PostConstruct
), it starts consuming messages from the subscribed Kafka topic. The receiveAutoAck()
method ensures that messages are automatically acknowledged after being received. Each message is processed within a reactive pipeline, logged, and returned as a Flux<String>
. Any errors encountered during processing are logged. This approach ensures non-blocking, scalable message consumption within the application.
5. Handling Backpressure in Kafka Consumer
Backpressure is essential in reactive systems to prevent overwhelming consumers when processing large volumes of messages. In Kafka, if messages are produced faster than they can be processed, the system may experience memory pressure or slow performance. Reactor provides built-in backpressure strategies that allow us to control the rate of message consumption, ensuring that messages are processed efficiently without overloading system resources.
1 2 3 4 5 6 7 | Flux<String> kafkaFlux = kafkaConsumerTemplate .receive() .limitRate( 10 ) // Limit processing to 10 messages at a time .map(consumerRecord -> { log.info( "Processing message: {}" , consumerRecord.value()); return record.value(); }); |
By fetching a limited number of messages at a time, we can keep the system running smoothly and prevent overload, making the consumer handle traffic spikes better.
6. Handling Errors in Reactor Kafka
Handling errors properly in a Kafka consumer is crucial to ensure smooth message processing and prevent unexpected failures. In a reactive system, errors can occur due to network issues, deserialization failures, or Kafka broker unavailability. Instead of stopping the consumer, we can implement strategies to log errors, retry failed messages, or skip problematic records. This helps maintain a resilient and fault-tolerant application that can continue processing messages even when issues arise.
6.1 Implement Retry Strategy
Here is an example solution that implements a retry strategy.
1 2 3 4 5 | Flux<String> kafkaFlux = kafkaConsumerTemplate .receive() .retry( 3 ) // Retry up to 3 times before failing .doOnError(e -> log.error( "Error consuming Kafka messages" , e)) .subscribe(); |
This code consumes messages from Kafka and applies a retry strategy to handle failures. If an error occurs during message processing, the retry(3)
method attempts to reprocess the message up to three times before failing. Any errors encountered are logged using doOnError()
, ensuring visibility into issues without immediately stopping the consumer. This approach helps improve fault tolerance by allowing transient failures to be retried before considering the message unprocessable.
6.2 Retry Strategies for Fault Tolerance
In some cases, simply retrying a failed operation a fixed number of times may not be enough, especially when dealing with temporary network issues or Kafka broker delays. A more effective approach is to introduce delayed retries with exponential backoff, where each retry attempt is delayed progressively longer. This prevents an excessive load on the system and gives external dependencies time to recover.
1 2 3 4 5 | Flux<String> kafkaFlux = kafkaConsumerTemplate .receive() .retryWhen(Retry.backoff( 3 , Duration.ofSeconds( 2 ))) .doOnError(e -> log.error( "Error with backoff" , e)) .subscribe(); |
This code consumes messages from Kafka and applies a retry strategy with exponential backoff. If an error occurs, retryWhen(Retry.backoff(3, Duration.ofSeconds(2)))
retries the operation up to three times, introducing an initial 2-second delay between retries, which increases exponentially. This approach helps prevent overwhelming Kafka or external systems by spacing out retry attempts.
7. Conclusion
In this article, we explored how to create Kafka consumers using Reactor Kafka in a Spring Boot application. We started by configuring the Kafka consumer with ReceiverOptions
and ReactiveKafkaConsumerTemplate
, then implemented a reactive consumer service to process messages efficiently. We also covered important aspects such as handling backpressure, managing errors, and implementing retry strategies to ensure fault tolerance. By leveraging Project Reactor, our solution enables non-blocking, scalable, and resilient message consumption.
8. Download the Source Code
This article covered how to create consumers using Kafka Reactor.
You can download the full source code of this example here: kafka reactor create consumers