Working With Reactive Kafka Stream and Spring WebFlux
In modern application development, the need for real-time data processing and reactive programming has become increasingly important. Reactive programming allows developers to build non-blocking, asynchronous, and event-driven applications that can handle a large number of concurrent requests with minimal resource consumption. Two popular technologies that enable reactive programming are Apache Kafka and Spring WebFlux. In the context of Java Spring WebFlux and Reactive Kafka, these tools provide a powerful combination for building scalable, real-time streaming applications. This article will explore how to work with Reactive Kafka Streams and Spring WebFlux.
1. Introduction to Reactive Programming
Reactive programming is a programming paradigm that focuses on asynchronous data streams and the propagation of change. It is particularly well-suited for applications that need to handle a large number of concurrent requests, such as real-time data processing, IoT applications, and microservices architectures.
The key principles of reactive programming are:
- Asynchronous: Operations are non-blocking and can execute independently of the main program flow.
- Event-driven: Applications react to events or changes in data, rather than polling or waiting for data to become available.
- Backpressure: The ability to handle situations where the producer of data is faster than the consumer, ensuring that the consumer is not overwhelmed.
Reactive programming is often implemented using libraries such as Project Reactor (used by Spring WebFlux) or RxJava
2. Overview of Apache Kafka and Kafka Streams
2.1 Apache Kafka
Apache Kafka is a distributed streaming platform that allows us to publish, subscribe to, store, and process streams of records in real time. It is designed to handle high-throughput, low-latency data streams and is commonly used for building real-time data pipelines and streaming applications.
Kafka is based on a publish-subscribe model, where producers write data to topics, and consumers read data from topics. Kafka topics are partitioned and replicated across multiple brokers, providing fault tolerance and scalability.
2.2 Kafka Streams
Kafka Streams is a client library for building stream processing applications that transform, aggregate, and enrich data in real time. It allows us to process data streams using a high-level DSL (Domain Specific Language) or a lower-level Processor API.
Kafka Streams is tightly integrated with Kafka and provides features such as:
- Stateful processing: Maintains state across multiple records, enabling operations like windowed aggregations and joins.
- Fault tolerance: Automatically handles failures and ensures exactly once processing semantics.
- Scalability: Scales horizontally by distributing processing across multiple instances.
3. Introduction to Spring WebFlux
Spring WebFlux is a reactive web framework that supports non-blocking, asynchronous programming. It is built on top of Project Reactor, which provides the reactive streams implementation. Spring WebFlux is designed to handle a large number of concurrent connections with minimal resource consumption, making it ideal for building reactive microservices and real-time applications.
Key features of Spring WebFlux include:
- Reactive Programming Model: Supports reactive streams and non-blocking I/O.
- Functional Programming: Allows us to define routes and handlers using functional programming constructs.
- Integration with Reactive Libraries: Integrates with other reactive libraries such as Reactive Kafka, Reactive MongoDB, and more.
4. Setting Up a Reactive Kafka Stream with Spring WebFlux
To get started with Reactive Kafka Streams and Spring WebFlux, we need to set up a Spring Boot project with the necessary dependencies. We can create a Spring Boot project using Spring Initializr or your favourite IDE. Add the following dependencies:
01 02 03 04 05 06 07 08 09 10 11 12 | < dependency > < groupId >org.springframework.cloud</ groupId > < artifactId >spring-cloud-starter-stream-kafka</ artifactId > </ dependency > < dependency > < groupId >org.springframework.boot</ groupId > < artifactId >spring-boot-starter-webflux</ artifactId > </ dependency > < dependency > < groupId >org.springframework.cloud</ groupId > < artifactId >spring-cloud-stream-binder-kafka-reactive</ artifactId > </ dependency > |
4.1 Configure Kafka Properties
In your application.yml
or application.properties
file, configure the Kafka properties:
01 02 03 04 05 06 07 08 09 10 11 12 13 14 | spring: kafka: bootstrap-servers: localhost : 9092 consumer: group-id: my-group auto-offset-reset: earliest producer: key-serializer: org.apache.kafka.common.serialization.StringSerializer value-serializer: org.apache.kafka.common.serialization.StringSerializer properties: spring: json: trusted: packages: '*' |
The above YAML configuration sets up Spring Kafka for the Spring Boot application, defining both producer and consumer properties. The bootstrap-servers
field connects to a local Kafka broker at localhost:9092
. The consumer is assigned to "my-group"
and starts reading messages from the earliest available offset. The producer uses String serialization for both keys and values to ensure proper message encoding. Additionally, spring.json.trusted.packages: '*'
allows deserialization from any package, enabling flexible JSON handling.
5. Building a Reactive Kafka Consumer with Spring WebFlux
To build a reactive Kafka consumer, we can use the ReactiveKafkaConsumerTemplate
provided by the Reactive Kafka library. Firstly, create a Kafka Consumer Configuration. Define a configuration class to initialize and set up the Kafka consumer, specifying properties such as the bootstrap servers, consumer group ID, key/value deserializers, and reactive Kafka settings.
01 02 03 04 05 06 07 08 09 10 11 12 13 14 15 16 17 18 19 20 21 | @Configuration public class KafkaConsumerConfig { @Value ( "${spring.kafka.bootstrap-servers}" ) private String bootstrapServers; @Bean public ReceiverOptions<String, String> receiverOptions() { Map<String, Object> props = new HashMap<>(); props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); props.put(ConsumerConfig.GROUP_ID_CONFIG, "my-group" ); props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer. class ); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer. class ); return ReceiverOptions.create(props); } @Bean public ReactiveKafkaConsumerTemplate<String, String> reactiveKafkaConsumerTemplate(ReceiverOptions<String, String> receiverOptions) { return new ReactiveKafkaConsumerTemplate<>(receiverOptions); } } |
This configuration class sets up a reactive Kafka consumer using Reactor Kafka. The bootstrapServers
field is injected with the Kafka broker address from application.yml
using @Value("${spring.kafka.bootstrap-servers}")
. This allows the consumer to connect to the correct Kafka instance dynamically.
The receiverOptions()
method creates a ReceiverOptions<String, String>
bean, which holds consumer configuration properties. These properties include the Kafka broker address, the consumer group ID ("my-group"
), and deserializers (StringDeserializer.class
) for both keys and values, ensuring that messages are properly converted from byte arrays to String format.
The reactiveKafkaConsumerTemplate()
method defines a Reactive Kafka Consumer using ReactiveKafkaConsumerTemplate
. This bean is built using the configured ReceiverOptions
, allowing the application to consume Kafka messages reactively, supporting backpressure and non-blocking streaming.
5.1 Create a Kafka Consumer Service
To consume messages from Kafka in a non-blocking, event-driven manner, we use Reactor Kafka with Spring WebFlux. The following service class dynamically subscribes to a Kafka topic and processes messages reactively using ReactiveKafkaConsumerTemplate
.
01 02 03 04 05 06 07 08 09 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 | @Service public class KafkaConsumerService { private final ReceiverOptions<String, String> receiverOptions; public KafkaConsumerService(ReceiverOptions<String, String> receiverOptions) { this .receiverOptions = receiverOptions; } public Flux consumeMessages(String topic) { // Create a new ReceiverOptions with the specified topic ReceiverOptions<String, String> options = receiverOptions .subscription(Collections.singletonList(topic)); // Subscribe to the specified topic ReactiveKafkaConsumerTemplate<String, String> consumerTemplate = new ReactiveKafkaConsumerTemplate<>(options); // Consume messages from the topic return consumerTemplate .receiveAutoAck() .doOnNext(record -> System.out.println( "Received message: " + record.value())) .map(ConsumerRecord::value) .onErrorResume(e -> { System.err.println( "Error consuming message: " + e.getMessage()); return Mono.empty(); }); } } |
This service class enables reactive message consumption from Kafka topics. It first initializes ReceiverOptions
, specifying the topic to subscribe to. Then, it creates a ReactiveKafkaConsumerTemplate
to consume messages asynchronously. The receiveAutoAck()
method retrieves messages while automatically acknowledging them. Each received message is logged, mapped to extract its value, and returned as a Flux<String>
. If an error occurs, onErrorResume()
ensures that the stream continues running without crashing.
5.2 Expose the Kafka Consumer via a REST Endpoint
Create a REST controller to expose the Kafka consumer:
01 02 03 04 05 06 07 08 09 10 11 12 13 14 15 | @RestController @RequestMapping ( "/kafka" ) public class KafkaController { private final KafkaConsumerService kafkaConsumerService; public KafkaController(KafkaConsumerService kafkaConsumerService) { this .kafkaConsumerService = kafkaConsumerService; } @GetMapping (value = "/consume" , produces = MediaType.TEXT_EVENT_STREAM_VALUE) public Flux<String> consumeMessages() { return kafkaConsumerService.consumeMessages( "my-topic" ); } } |
6. Building a Reactive Kafka Producer with Spring WebFlux
To build a reactive Kafka producer, we can use ReactiveKafkaProducerTemplate
from the Reactive Kafka library, starting by defining a configuration class to set up the producer.
01 02 03 04 05 06 07 08 09 10 11 12 13 14 15 16 17 18 19 20 | @Configuration public class KafkaProducerConfig { @Value ( "${spring.kafka.bootstrap-servers}" ) private String bootstrapServers; @Bean public SenderOptions<String, String> senderOptions() { Map<String, Object> props = new HashMap<>(); props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer. class ); props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer. class ); return SenderOptions.create(props); } @Bean public ReactiveKafkaProducerTemplate<String, String> reactiveKafkaProducerTemplate(SenderOptions<String, String> senderOptions) { return new ReactiveKafkaProducerTemplate<>(senderOptions); } } |
This Kafka producer configuration class is responsible for setting up a reactive Kafka producer using ReactiveKafkaProducerTemplate
. The bootstrapServers
field is injected with the Kafka broker address from application.yml
using @Value("${spring.kafka.bootstrap-servers}")
. This ensures that the producer dynamically connects to the configured Kafka instance without hardcoding the broker address.
The senderOptions()
method creates and configures a SenderOptions<String, String>
bean, which defines key producer properties. The BOOTSTRAP_SERVERS_CONFIG
property specifies the Kafka broker’s address, while KEY_SERIALIZER_CLASS_CONFIG
and VALUE_SERIALIZER_CLASS_CONFIG
are set to StringSerializer.class
, ensuring that both keys and values of messages are serialized as Strings before being sent to Kafka.
The reactiveKafkaProducerTemplate()
method creates a ReactiveKafkaProducerTemplate<String, String>
bean, which is responsible for sending messages to Kafka in a reactive and non-blocking manner. This template is built using the configured SenderOptions
, ensuring that all producer settings are applied.
6.1 Create a Kafka Producer Service
Create a service that produces messages to a Kafka topic:
01 02 03 04 05 06 07 08 09 10 11 12 13 14 15 16 | @Service public class KafkaProducerService { private final ReactiveKafkaProducerTemplate<String, String> reactiveKafkaProducerTemplate; public KafkaProducerService(ReactiveKafkaProducerTemplate<String, String> reactiveKafkaProducerTemplate) { this .reactiveKafkaProducerTemplate = reactiveKafkaProducerTemplate; } public Mono<Void> sendMessage(String topic, String message) { return reactiveKafkaProducerTemplate.send(topic, message) .doOnSuccess(result -> System.out.println( "Message sent successfully: " + message)) .doOnError(e -> System.err.println( "Error sending message: " + e.getMessage())) .then(); } } |
This Kafka producer service is responsible for sending messages to a Kafka topic in a reactive and non-blocking manner using ReactiveKafkaProducerTemplate
. The class injects an instance of ReactiveKafkaProducerTemplate<String, String>
through its constructor. The sendMessage(String topic, String message)
method takes a Kafka topic and a message as parameters and returns a Mono<Void>
, indicating an asynchronous operation that completes when the message is sent.
Inside the method, reactiveKafkaProducerTemplate.send(topic, message)
sends the message to Kafka. The doOnSuccess(result -> System.out.println(...))
callback logs a success message when the message is published, while doOnError(e -> System.err.println(...))
logs any errors that occur during the process. The .then()
ensures that the method returns a Mono<Void>
, meaning it does not emit any value but simply signals completion or error.
6.2 Expose the Kafka Producer via a REST Endpoint
Create a REST controller to expose the Kafka producer:
01 02 03 04 05 06 07 08 09 10 11 12 13 14 15 | @RestController @RequestMapping ( "/kafka" ) public class KafkaController { private final KafkaProducerService kafkaProducerService; public KafkaController(KafkaProducerService kafkaProducerService) { this .kafkaProducerService = kafkaProducerService; } @PostMapping ( "/produce" ) public Mono<Void> produceMessage( @RequestParam String message) { return kafkaProducerService.sendMessage( "my-topic" , message); } } |
This Kafka controller exposes an endpoint for producing messages to a Kafka topic. The controller injects an instance of KafkaProducerService
through its constructor, ensuring that it can access the reactive Kafka producer for sending messages. The produceMessage(@RequestParam String message)
method is mapped to a POST request at /kafka/produce
, allowing clients to send messages via an HTTP request. It takes a message as a request parameter and calls kafkaProducerService.sendMessage("my-topic", message)
, which asynchronously sends the message to Kafka.
Since the method returns a Mono<Void>
, it follows reactive principles, meaning the request completes when the message is successfully sent or an error occurs. This non-blocking approach ensures that the API can handle multiple requests efficiently.
7. Testing the Consumer Endpoint
Creating a Kafka Topic Manually
To test the consumer with a specific topic, you need to ensure the topic exists in your Kafka broker. You can create a Kafka topic manually using the kafka-topics.sh
script provided by Kafka. If you have Kafka installed locally, navigate to the Kafka bin
directory and use the kafka-topics.sh
script to create a new topic. For example, to create a topic named my-topic
with 1 partition and a replication factor of 1:
1 | . /kafka-topics .sh --create --topic my-topic --bootstrap-server localhost:9092 --partitions 1 --replication-factor 1 |
You can verify that the topic was created successfully by listing all topics:
1 | . /kafka-topics .sh --list --bootstrap-server localhost:9092 |
You should see my-topic
in the list of topics.
Start the Consumer
Open a terminal and run the following curl
command to consume messages from a specific topic (e.g., my-topic
):
Send Messages to the Producer
Open another terminal and send messages to the my-topic
topic using the producer endpoint:
Observe the Consumer Output
In the first terminal (where the consumer is running), you should see the messages appear in real-time:
1 2 3 | data:TestMessage2 data:TestMessage1 |
8. Handling Backpressure in Reactive Kafka Streams
Backpressure is a critical concept in reactive programming, where the consumer needs to control the rate at which it receives data from the producer to avoid being overwhelmed. In the context of Kafka, backpressure can be handled using the onBackpressureBuffer
, onBackpressureDrop
, or onBackpressureLatest
operators provided by Project Reactor.
For example, we can apply backpressure to the Kafka consumer as follows:
01 02 03 04 05 06 07 08 09 10 11 | public Flux<String> consumeMessages(String topic) { return reactiveKafkaConsumerTemplate .receiveAutoAck() .doOnNext(record -> System.out.println( "Received message: " + record.value())) .map(ConsumerRecord::value) .onBackpressureBuffer( 1000 ) // Buffer up to 1000 messages .onErrorResume(e -> { System.err.println( "Error consuming message: " + e.getMessage()); return Mono.empty(); }); } |
9. Error Handling and Retry Mechanisms
In a reactive Kafka stream, it is essential to handle errors gracefully and implement retry mechanisms to ensure the reliability of the application.
9.1 Error Handling
We can handle errors using the onErrorResume
or onErrorContinue
operators:
01 02 03 04 05 06 07 08 09 10 | public Flux<String> consumeMessages(String topic) { return reactiveKafkaConsumerTemplate .receiveAutoAck() .doOnNext(record -> System.out.println( "Received message: " + record.value())) .map(ConsumerRecord::value) .onErrorResume(e -> { System.err.println( "Error consuming message: " + e.getMessage()); return Mono.empty(); }); } |
9.2 Retry Mechanism
We can implement a retry mechanism using the retry
or retryWhen
operators:
01 02 03 04 05 06 07 08 09 10 11 | public Flux<String> consumeMessages(String topic) { return reactiveKafkaConsumerTemplate .receiveAutoAck() .doOnNext(record -> System.out.println( "Received message: " + record.value())) .map(ConsumerRecord::value) .retryWhen(Retry.backoff( 3 , Duration.ofSeconds( 1 ))) .onErrorResume(e -> { System.err.println( "Error consuming message: " + e.getMessage()); return Mono.empty(); }); } |
10. Conclusion
In this article, we explored how to work with Reactive Kafka Streams and Java Spring WebFlux to build a reactive, real-time data processing application. We covered the basics of reactive programming, Kafka, and Spring WebFlux, and demonstrated how to set up a reactive Kafka consumer and producer. By leveraging the power of reactive programming, we can build highly scalable and efficient applications that can handle large volumes of data in real-time. With the combination of Kafka and Spring WebFlux, we can create event-driven systems that are well-suited for modern application development.
11. Download the Source Code
This article covered integrating Java Spring WebFlux with Reactive Kafka Streams.
You can download the full source code of this example here: java spring webflux reactive kafka