Getting Started with Sample Programs for Apache Kafka 0.9
Streaming data is of growing interest to many organizations, and most applications need to use a producer-consumer model to ingest and process data in real time. Many messaging solutions exist today on the market, but few of them have been built to handle the challenges of modern deployment related to IoT, large web based applications and related big data projects.
Apache Kafka has been built by LinkedIn to solve these challenges and deployed on many projects. Apache Kafka is a fast, scalable, durable and distributed messaging system.
The goal of this article is use an end-to-end example and sample code to show you how to:
- Install, configure and start Kafka
- Create new topics
- Write and run a Java producer to post messages to topics
- Write and run a Java consumer to read and process messages from the topics
Credits
This content is based in part on the documentation provided by the Apache Kafka project.
We have added short, realistic sample programs that illustrate how real programs are written using Kafka.
Prerequisites
You will need basic Java programming skills plus access to:
- Apache Kafka 0.9.0
- Apache Maven 3.0 or later
- Git
Installation
Step 1: Download Kafka
Download the Apache Kafka 0.9.0 release and un-tar it.
$ tar -xzf kafka_2.11-0.9.0.0.tgz $ cd kafka_2.11-0.9.0.0
Step 2: Start the server
Start a ZooKeeper server; Kafka has a single node Zookeeper configuration built-in.
$ bin/zookeeper-server-start.sh config/zookeeper.properties & [2016-02-08 14:59:28,275] INFO Reading configuration from: config zookeeper.properties (org.apache.zookeeper.server.quorum.QuorumPeerConfig) [2016-02-08 14:59:28,276] INFO autopurge.snapRetainCount set to 3 (org.apache.zookeeper.server.DatadirCleanupManager) ...
Note that this will start Zookeeper in the background. To stop Zookeeper, you will need to bring it back to the foreground and use control-C or you will need to find the process and kill it. You can now start the Kafka server itself:
$ bin/kafka-server-start.sh config/server.properties [2016-02-08 15:10:29,945] INFO KafkaConfig values: .. ....
As with Zookeeper, this runs the Kafka broker in the background. To stop Kafka, you will need to bring it back to the foreground or find the process and kill it explicitly using kill.
Step 3: Create the topics for the example programs
The messages are organized by topics on which Producers post messages and from which Consumers read messages. Our sample application uses tow topics: fast-messages and summary-markers. The following commands create the topics:
$ bin/kafka-topics.sh --create --zookeeper localhost:2181 \ --replication-factor 1 --partitions 1 --topic fast-messages $ bin/kafka-topics.sh --create --zookeeper localhost:2181 \ --replication-factor 1 --partitions 1 --topic summary-markers
These can be listed:
$ bin/kafka-topics.sh --list --zookeeper localhost:2181 fast-messages summary-markers
You will see log messages from the Kafka process when you run Kafka commands. You can switch to a different window if these are distracting.
Note: The broker can be configured to auto-create new topics as they are mentioned by the client application, but that is often considered a bit dangerous because mis-spelling a topic name doesn’t cause a failure.
Run your First Kafka Application
At this point, you should have a working Kafka broker running on your machine. The next steps are to compile the example programs and play around with the way that they work.
1- Compile and package up the example programs
Clone and compile the repository using the following commands:
$ git clone https://github.com/mapr-demos/kafka-sample-programs.git $ cd kafka-sample-programs/ $ mvn clean package
For convenience, the example programs project is set up so that the maven package target produces a single executable, target/kafka-example, that includes all of the example programs and dependencies.
2- Start the example consumer
Start the consumer using the following command:
$ target/kafka-example consumer SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder". SLF4J: Defaulting to no-operation (NOP) logger implementation SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
The consumer is now started and listen to all the messages on the fast-messages and summary-markers topics
Nothing should happen at this point because there aren’t any messages
3- Run the example producer
In a new terminal window, run the example producer using the following command:
$ target/kafka-example producer Sent msg number 0 Sent msg number 1000 ... Sent msg number 998000 Sent msg number 999000
The producer sends a large number of messages to fast-messages along with occasional messages to summary-markers.
The consumer running in the other window receives and processes all the messages from these topics.
$ target/kafka-example consumer SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder". SLF4J: Defaulting to no-operation (NOP) logger implementation SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details. Got 31003 records after 0 timeouts 1 messages received in period, latency(min, max, avg, 99%) = 20352, 20479, 20416.0, 20479 (ms) 1 messages received overall, latency(min, max, avg, 99%) = 20352, 20479, 20416.0, 20479 (ms) 1000 messages received in period, latency(min, max, avg, 99%) = 19840, 20095, 19968.3, 20095 (ms) 1001 messages received overall, latency(min, max, avg, 99%) = 19840, 20479, 19968.7, 20095 (ms) ... 1000 messages received in period, latency(min, max, avg, 99%) = 12032, 12159, 12119.4, 12159 (ms) 998001 messages received overall, latency(min, max, avg, 99%) = 12032, 20479, 15073.9, 19583 (ms) 1000 messages received in period, latency(min, max, avg, 99%) = 12032, 12095, 12064.0, 12095 (ms) 999001 messages received overall, latency(min, max, avg, 99%) = 12032, 20479, 15070.9, 19583 (ms)
If you run the producer again, you will see new messages in the consumer terminal window.
A quick look to the Producer and Consumer code
At this point you have Kafka running, a simple Kafka application that sends and consumes messages. It is time to look at the code and understand how the application has been created.
Dependencies
To create a Kafka Producer or Consumer, so a Kafka Client Application, you must add the following dependency to your Maven project:
<dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>0.9.0.0</version> </dependency>
Producer
The sample Producer is a classical Java application with a main() method, this application must:
- Initialize and configure a producer
- Use the producer to send messages
1- Producer Initialization
Create a producer is quite simple, you just need to implement a org.apache.kafka.clients.producer.KafkaProducer
class with a set of properties, this looks like:
producer = new KafkaProducer(properties);
In this example, the configuration is externalized in a property file, with the following entries:
bootstrap.servers=localhost:9092 acks=all ... block.on.buffer.full=true
For this introduction, the most important property to look at is:
- the
bootstrap.servers
lists the host and port of the Kafka server/cluster you started earlier in this tutorial.
The other properties are used to control the way the messages are sent, and serialized. You can find information about all the properties in the Producer Configs chapter of the Kafka documentation.
2- Message posting
Once you have a producer instance you can post messages to a topic using the ProducerRecord class. The ProducerRecord class is a key/value pair where:
- the key is the topic
- the value is the message
As you can guess sending a message to the topic is straight forward:
... producer.send(new ProducerRecord("fast-messages", "This is a dummy message")); ...
Note that, there are other ProducerRecord
constructors that allow you to more constructor parameters such as a message key, a partition number but these parameters are not used in this simple tutorial.
The sample application producer post messages using a loop to send:
- 1 message every iteration to the fast-messages topic
- 1 marker message every 1000 iterations to the fast-messages topic
- 1 message every 1000 iterations to the summary-markers
3- Producer End
Once you are done with the producer use the producer.close()
method that blocks the process until all the messages are sent to the server. This call is used in a finally block to guarantee that it is called. A Kafka producer can also be used in a try with resources construct.
... } finally { producer.close(); } ...
4- Producer execution
As mentioned earlier, the producer is a simple Java class, in this example application the Producer is started from the Run Application as follow:
... Producer.main(args); ...
Now that you know how to send messages to the Kafka server let’s look at the consumer.
Consumer
The Consumer class, like the producer is a simple Java class with a main method.
This sample consumer uses the HdrHistogram library to record and analyze the messages received from the fast-messages topic, and Jackson to parse JSON messages.
This is why you see the following dependencies in the pom.xml file:
<dependency> <groupId>org.hdrhistogram</groupId> <artifactId>HdrHistogram</artifactId> <version>2.1.8</version> </dependency> <dependency> <groupId>com.fasterxml.jackson.core</groupId> <artifactId>jackson-databind</artifactId> <version>2.5.1</version> </dependency>
Let’s now focus on the consumer code.
1- Consumer Initialization
The first thing to do is to create a consumer instance of the org.apache.kafka.clients.consumer.KafkaConsumer
class with a set of properties, that looks like:
consumer = new KafkaConsumer(properties);
In this example, the properties are externalized in a file, with the following entries:
bootstrap.servers=localhost:9092 group.id=test enable.auto.commit=true ... max.partition.fetch.bytes=2097152
For this introduction, the most important properties to look at are
- the bootstrap.servers that is the host and port of the Kafka server/cluster you have started earlier in this tutorial
- the group.id that the group of consumer processes to which this consumer belongs.
The other properties are used to control the way the messages are consumed. You can find information about all the properties in the Consumer Configs chapter of the Kafka documentation.
2- Topics Subscription
A consumer can subscribe to one or more topics, in this example, the consumer will listen to messages from two topics using the following code:
consumer.subscribe(Arrays.asList("fast-messages", "summary-markers"));
3- Message Consumption
Now that your consumer has subscribed to the topics, the consumer can now poll the messages from the topics in an loop. The loop looks like:
... while (true) { ConsumerRecords records = consumer.poll(200); ... } ...
The poll method is called repeatedly in the loop. For each call, the consumer will read records from the topic. For each read, it tracks the offset to be able to read from the proper message in the next call. The poll method takes a timeout in milliseconds. It will wait up to that long if data is not available.
The returned object, of the poll method, is an Iterable containing the received records so you just need to loop on each record to process them. The code to process the messages in the consumer looks like:
... for (ConsumerRecord record : records) { switch (record.topic()) { case "fast-messages": // deal with messages from fast-messages topic ... case "summary-markers": // deal with messages from summary-markers topic ... break; default: } } ...
In the sample application, the consumer only processes messages from the fast-messages topic with the following logic based on the fact that messages are consumed in order in which they have been sent by the producer:
- for each JSON message with the type test, the latency is added to the Histogram stats
- when a JSON message with the type marker is processed the stats are printed, and reset.
4- Consumer end
Once you are done with the consumer use the consumer.close()
method to free resources. This is especially important in a multithreaded application. The sample consumer does not call this method, as it is stopped with a Ctrl+C that stops the whole JVM.
5- Producer execution
As mentioned earlier, the Consumer is a simple Java class, in this example application the consumer is started from the Run Application as follow:
... Consumer.main(args); ...
Conclusion
In this article you have learned how to create a simple Kafka 0.9.x application using:
- a producer that publishes JSON messages into multiple topics
- a consumer that receives JSON messages and calculates statistics from message content.
This application is very simple, and you can extend it to test some other interesting features of Kafka:
- add new consumers, using different groups, to process the data differently, for example to save data into a NoSQL database like HBase or MapR-DB
- add new partitions and consumers for the topics to provide high availability to your application.
Finally, while this example is based on Apache Kafka, the same code will work directly on a MapR cluster using MapR-Streams, an integrated messaging system that is compatible with the Kafka 0.9.0 API. With MapR-Streams you will simplify the production deployment of your application as it is integrated into the MapR data platform so that you will have a single cluster to deploy and manage. Using MapR Streams for messaging also provides additional scalability, security and big data services associated with the MapR Converged Data Platform.
Reference: | Getting Started with Sample Programs for Apache Kafka 0.9 from our JCG partner Tugdual Grall at the Mapr blog. |