Enterprise Java

Send your data async on Kafka

For a project, I’m trying to log the basic transactions of the user such as addition and removal of an item and for multiple types of items and sending a message to kafka for each transaction. The accuracy of the log mechanism is not crucial and I don’t want it to block my business code in the case of kafka server downtime. In this case an async approach for sending data to kafka is a better way to go.

My kafka producer code is in its boot project. For making it async, I just have to add two annotations: @EnableAsync and @Async.

@EnableAsync will be used in your configuration class (also remember that your class with @SpringBootApplication is also a config class) and will try to find a TaskExecutor bean. If not, it creates a SimpleAsyncTaskExecutor. SimpleAsyncTaskExecutor is ok for toy projects but for anything larger than that it’s a bit risky since it does not limit concurrent threads and does not reuse threads. So to be safe, we will also add a task executor bean.

So,

1
2
3
4
5
6
@SpringBootApplication
public class KafkaUtilsApplication {
    public static void main(String[] args) {
        SpringApplication.run(KafkaUtilsApplication.class, args);
    }
}

will become

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
@EnableAsync
@SpringBootApplication
public class KafkaUtilsApplication {
    public static void main(String[] args) {
        SpringApplication.run(KafkaUtilsApplication.class, args);
    }
 
    @Bean
    public Executor taskExecutor() {
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        executor.setCorePoolSize(2);
        executor.setMaxPoolSize(2);
        executor.setQueueCapacity(500);
        executor.setThreadNamePrefix("KafkaMsgExecutor-");
        executor.initialize();
        return executor;
    }
}

As you can see there’s not much change here. The default values I set should be tweaked based on your app’s needs.

The second thing we need is addition of @Async.

My old code was:

01
02
03
04
05
06
07
08
09
10
11
12
13
@Service
public class KafkaProducerServiceImpl implements KafkaProducerService {
 
    private static final String TOPIC = "logs";
 
    @Autowired
    private KafkaTemplate<String, KafkaInfo> kafkaTemplate;
 
    @Override
    public void sendMessage(String id, KafkaType kafkaType, KafkaStatus kafkaStatus) {
        kafkaTemplate.send(TOPIC, new KafkaInfo(id, kafkaType, kafkaStatus);
    }
}

As you can see the sync code is quite straightforward. It just takes the kafkaTemplate and sends a message object to the “logs” topic. My new code is a bit longer than that.

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
@Service
public class KafkaProducerServiceImpl implements KafkaProducerService {
 
    private static final String TOPIC = "logs";
 
    @Autowired
    private KafkaTemplate kafkaTemplate;
 
    @Async
    @Override
    public void sendMessage(String id, KafkaType kafkaType, KafkaStatus kafkaStatus) {
        ListenableFuture<SendResult<String, KafkaInfo>> future = kafkaTemplate.send(TOPIC, new KafkaInfo(id, kafkaType, kafkaStatus));
        future.addCallback(new ListenableFutureCallback<>() {
            @Override
            public void onSuccess(final SendResult<String, KafkaInfo> message) {
                // left empty intentionally
            }
 
            @Override
            public void onFailure(final Throwable throwable) {
                // left empty intentionally
 
            }
        });
    }
}

Here onSuccess() is not really meaningful for me. But onFailure() I can log the exception so I’m informed if there’s a problem with my kafka server.

There’s another thing I have to share with you. For sending an object through kafkatemplate, I have to equip it with the serializer file I have.

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
public class KafkaInfoSerializer implements Serializer<kafkainfo> {
 
    @Override
    public void configure(Map map, boolean b) {
    }
 
    @Override
    public byte[] serialize(String arg0, KafkaInfo info) {
        byte[] retVal = null;
        ObjectMapper objectMapper = new ObjectMapper();
        try {
            retVal = objectMapper.writeValueAsString(info).getBytes();
        } catch (Exception e) {
            // log the exception
        }
        return retVal;
    }
 
    @Override
    public void close() {
    }
}

Also, don’t forget to add the configuration for it. There are several ways of defining serializers for kafka. One of the easiest ways is adding it to application.properties. 

spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer spring.kafka.producer.value-serializer=com.sezinkarli.kafkautils.serializer.KafkaInfoSerializer

Now you have a boot project that can send async objects to the desired topic.

Published on Java Code Geeks with permission by Sezin Karli, partner at our JCG program. See the original article here: send your data async on kafka

Opinions expressed by Java Code Geeks contributors are their own.

Sezin Karli

Mathematics Engineer & Computer Scientist with a passion for software development. Avid learner for new technologies. Currently working as Senior Software Engineer at Sahibinden.com.
Subscribe
Notify of
guest

This site uses Akismet to reduce spam. Learn how your comment data is processed.

3 Comments
Oldest
Newest Most Voted
Inline Feedbacks
View all comments
wangjie
wangjie
4 years ago

very good

Jimis
Jimis
4 years ago

please, provide import for KafkaProducerService

Yograj Shinde
Yograj Shinde
3 years ago

how about listener, i.g consuming msg with Async?

Back to top button