Handling Kafka Producer Retries
In distributed messaging systems, such as Apache Kafka, it’s essential to ensure that messages are delivered reliably, even when facing transient failures. Kafka provides a retry mechanism that allows producers to attempt to resend messages if the initial send fails due to issues like network instability or broker unavailability. This article will cover how retries work in Kafka, configurations for enabling retries, and a sample Kafka Producer code that implements retries.
1. Understanding Kafka Producer Retries
Kafka allows producers to automatically retry sending messages when an error occurs. The retries mechanism is designed to handle temporary failures, allowing the producer to resend messages that fail due to transient issues. When a producer sends a message, Kafka returns either an acknowledgement (if successful) or an exception (if there is a failure).
Configuring retries helps ensure that a message is not lost due to temporary issues by retrying failed sends. To enable Kafka retries, we must configure the producer with a few important settings:
retries
(defaults toInteger.MAX_VALUE
): The number of times the producer will retry sending a message in case of failure. By default, it retries indefinitely until the message is sent or the delivery timeout is reached.delivery.timeout.ms
(defaults to120,000
ms or 2 minutes): The maximum amount of time, in milliseconds, the producer will try to send a message before giving up, even if retries are enabled. If the timeout is reached, the producer stops retrying.retry.backoff.ms
(defaults to100
ms): The time, in milliseconds, that the producer waits between retry attempts. This backoff helps to avoid flooding the broker with retry requests in case of temporary failures.retry.backoff.max.ms
(defaults to1,000
ms): The maximum time, in milliseconds, the producer will back off before making the next retry attempt.
2. Using Kafka Producer with Default Retry Configuration
In this example, we don’t explicitly configure retries, so that the Kafka Producer will use its default settings. By default, Kafka Producer retries are set to Integer.MAX_VALUE
, meaning Kafka will retry indefinitely until the message is successfully sent or the delivery timeout is reached.
public class KafkaProducerDefaultRetries { public static Properties producerConfig() { Properties props = new Properties(); // No explicit retries configuration, default settings will apply props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); props.put(ProducerConfig.ACKS_CONFIG, "all"); return props; } public static void main(String[] args) { KafkaProducer<String, String> producer = new KafkaProducer<>(producerConfig()); ProducerRecord<String, String> record = new ProducerRecord<>("test_topic", "key", "Message with default retries"); try { Future<RecordMetadata> future = producer.send(record); RecordMetadata metadata = future.get(); // Wait for the message to be sent System.out.printf("Message sent successfully to topic %s, partition %d, offset %d%n", metadata.topic(), metadata.partition(), metadata.offset()); } catch (ExecutionException | InterruptedException e) { System.err.println("Failed to send message after retries: " + e.getMessage()); } finally { producer.close(); } } }
With these default settings, the producer will attempt to send the message, and if the Kafka broker is unavailable, it will retry until the delivery.timeout.ms
is reached, which by default is set to 120,000 ms (2 minutes).
Simulating a Failure:
To test the retry mechanism, we can simulate a failure by shutting down the Kafka broker temporarily.
Console Output (Using Default Retries):
If Kafka is unavailable for an extended period and the delivery timeout is reached, the following error might be seen:
Failed to send message after retries: org.apache.kafka.common.errors.TimeoutException: Topic test_topic not present in metadata after 60000 ms.
3. Custom Kafka Retry Configuration
In this scenario, we simulate a temporary failure (such as a brief network issue or Kafka broker restart). The producer will retry sending the message, and after a few attempts, the broker becomes available, allowing the message to be delivered successfully.
public class KafkaProducerRetriesSuccessful { public static Properties producerConfig() { Properties props = new Properties(); props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); // Enable retries props.put(ProducerConfig.RETRIES_CONFIG, 5); // Retry up to 5 times props.put(ProducerConfig.RETRY_BACKOFF_MS_CONFIG, 500); // Wait 500ms between retries props.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, 2000); // 2 seconds timeout for each request props.put(ProducerConfig.ACKS_CONFIG, "all"); return props; } public static void main(String[] args) { KafkaProducer<String, String> producer = new KafkaProducer<>(producerConfig()); ProducerRecord<String, String> record = new ProducerRecord<>("test_topic", "key", "Message that will succeed after retries"); try { Future<RecordMetadata> future = producer.send(record); RecordMetadata metadata = future.get(); // Wait for the message to be sent System.out.printf("Message sent successfully to topic %s, partition %d, offset %d%n", metadata.topic(), metadata.partition(), metadata.offset()); } catch (ExecutionException | InterruptedException e) { System.err.println("Failed to send message after retries: " + e.getMessage()); } finally { producer.close(); } } }
In this example, the producer is set to attempt retries up to five (5) times before failing, with a 500 ms backoff period between each retry. To simulate a temporary outage where the producer can ultimately succeed in sending the message after retrying, restart the Kafka broker after a few seconds of running this example.
Console Output (Retries Successful):
Once Kafka becomes available again and the message is successfully delivered, the following output is expected:
Message sent successfully to topic test_topic, partition 0, offset 0
This output confirms that the producer was able to successfully send the message after a few retry attempts.
4. Conclusion
In this article, we explored how to configure Kafka Producer retries, demonstrating both the default behaviour and how to customize retry settings for improved reliability. By implementing tailored configurations such as retry limits, backoff timing, and delivery timeouts, we can effectively handle temporary failures and ensure message delivery in our Kafka applications.
5. Download the Source Code
This article covered Kafka Producer retries.
You can download the full source code of this example here: kafka producer retries