Send your data async on Kafka
For a project, I’m trying to log the basic transactions of the user such as addition and removal of an item and for multiple types of items and sending a message to kafka for each transaction. The accuracy of the log mechanism is not crucial and I don’t want it to block my business code in the case of kafka server downtime. In this case an async approach for sending data to kafka is a better way to go.
My kafka producer code is in its boot project. For making it async, I just have to add two annotations: @EnableAsync and @Async.
@EnableAsync will be used in your configuration class (also remember that your class with @SpringBootApplication is also a config class) and will try to find a TaskExecutor bean. If not, it creates a SimpleAsyncTaskExecutor. SimpleAsyncTaskExecutor is ok for toy projects but for anything larger than that it’s a bit risky since it does not limit concurrent threads and does not reuse threads. So to be safe, we will also add a task executor bean.
So,
1 2 3 4 5 6 | @SpringBootApplication public class KafkaUtilsApplication { public static void main(String[] args) { SpringApplication.run(KafkaUtilsApplication. class , args); } } |
will become
01 02 03 04 05 06 07 08 09 10 11 12 13 14 15 16 17 18 | @EnableAsync @SpringBootApplication public class KafkaUtilsApplication { public static void main(String[] args) { SpringApplication.run(KafkaUtilsApplication. class , args); } @Bean public Executor taskExecutor() { ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); executor.setCorePoolSize( 2 ); executor.setMaxPoolSize( 2 ); executor.setQueueCapacity( 500 ); executor.setThreadNamePrefix( "KafkaMsgExecutor-" ); executor.initialize(); return executor; } } |
As you can see there’s not much change here. The default values I set should be tweaked based on your app’s needs.
The second thing we need is addition of @Async.
My old code was:
01 02 03 04 05 06 07 08 09 10 11 12 13 | @Service public class KafkaProducerServiceImpl implements KafkaProducerService { private static final String TOPIC = "logs" ; @Autowired private KafkaTemplate<String, KafkaInfo> kafkaTemplate; @Override public void sendMessage(String id, KafkaType kafkaType, KafkaStatus kafkaStatus) { kafkaTemplate.send(TOPIC, new KafkaInfo(id, kafkaType, kafkaStatus); } } |
As you can see the sync code is quite straightforward. It just takes the kafkaTemplate and sends a message object to the “logs” topic. My new code is a bit longer than that.
01 02 03 04 05 06 07 08 09 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 | @Service public class KafkaProducerServiceImpl implements KafkaProducerService { private static final String TOPIC = "logs" ; @Autowired private KafkaTemplate kafkaTemplate; @Async @Override public void sendMessage(String id, KafkaType kafkaType, KafkaStatus kafkaStatus) { ListenableFuture<SendResult<String, KafkaInfo>> future = kafkaTemplate.send(TOPIC, new KafkaInfo(id, kafkaType, kafkaStatus)); future.addCallback( new ListenableFutureCallback<>() { @Override public void onSuccess( final SendResult<String, KafkaInfo> message) { // left empty intentionally } @Override public void onFailure( final Throwable throwable) { // left empty intentionally } }); } } |
Here onSuccess() is not really meaningful for me. But onFailure() I can log the exception so I’m informed if there’s a problem with my kafka server.
There’s another thing I have to share with you. For sending an object through kafkatemplate, I have to equip it with the serializer file I have.
01 02 03 04 05 06 07 08 09 10 11 12 13 14 15 16 17 18 19 20 21 22 | public class KafkaInfoSerializer implements Serializer<kafkainfo> { @Override public void configure(Map map, boolean b) { } @Override public byte [] serialize(String arg0, KafkaInfo info) { byte [] retVal = null ; ObjectMapper objectMapper = new ObjectMapper(); try { retVal = objectMapper.writeValueAsString(info).getBytes(); } catch (Exception e) { // log the exception } return retVal; } @Override public void close() { } } |
Also, don’t forget to add the configuration for it. There are several ways of defining serializers for kafka. One of the easiest ways is adding it to application.properties.
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer spring.kafka.producer.value-serializer=com.sezinkarli.kafkautils.serializer.KafkaInfoSerializer
Now you have a boot project that can send async objects to the desired topic.
Published on Java Code Geeks with permission by Sezin Karli, partner at our JCG program. See the original article here: send your data async on kafka Opinions expressed by Java Code Geeks contributors are their own. |
very good
please, provide import for KafkaProducerService
how about listener, i.g consuming msg with Async?