Kafka & Zookeeper for Development: Connecting Brokers to the Ensemble
Previously we created successfully a Zookeeper ensemble, now it’s time to add some Kafka brokers that will connect to the ensemble and we shall execute some commands.
We will pick up from the same docker compose file we compiled previously. First let’s jump on the configuration that a Kafka broker needs.
01 02 03 04 05 06 07 08 09 10 11 12 13 14 15 16 17 18 19 20 21 22 | offsets.topic.replication.factor= 1 transaction.state.log.replication.factor= 1 transaction.state.log.min.isr= 1 group.initial.rebalance.delay.ms= 0 socket.send.buffer.bytes= 102400 delete.topic.enable= true socket.request.max.bytes= 104857600 log.cleaner.enable= true log.retention.check.interval.ms= 300000 log.retention.hours= 168 num.io.threads= 8 broker.id= 0 log4j.opts=-Dlog4j.configuration=file:/etc/kafka/log4j.properties log.dirs=/var/lib/kafka auto.create.topics.enable= true num.network.threads= 3 socket.receive.buffer.bytes= 102400 log.segment.bytes= 1073741824 num.recovery.threads.per.data.dir= 1 num.partitions= 1 zookeeper.connection.timeout.ms= 6000 zookeeper.connect=zookeeper- 1 : 2181 ,zookeeper- 2 : 2181 ,zookeeper- 3 : 2181 |
Will go through the ones that is essential to know.
- offsets.topic.replication.factor: how the internal offset topic gets replicated – replication factor
- transaction.state.log.replication.factor: how the internal transaction topic gets replicated – replication factor
- transaction.state.log.min.isr: the minimum in sync replicas for the internal transaction topic
- delete.topic.enable: if not true Kafka will ignore the delete topic command
- socket.request.max.bytes: the maximum size of requests
- log.retention.check.interval.ms: the interval to evaluate if a log should be deleted
- log.retention.hours: how many hours a log is retained before getting deleted
- broker.id: what is the broker id of that installation
- log.dirs: the directories where Kafka will store the log data, can be a comma separated
- auto.create.topics.enable: create topics if they don’t exist on sending/consuming messages or asking for topic metadata
- num.network.threads: threads on receiving requests and sending responses from the network
- socket.receive.buffer.bytes: buffer of the server socket
- log.segment.bytes: the size of a log file
- num.recovery.threads.per.data.dir: threads used for log recovery at startup and flushing at shutdown
- num.partitions: has to do with the default number of partition a topic will have once created if partition number is not specified.
- zookeeper.connection.timeout.ms: time needed for a client to establish connection to ZooKeeper
- zookeeper.connect: is the list of the ZooKeeper servers
Now it’s time to create the properties for each broker. Due to the broker.id property we need to have different files with the corresponding broker.id
So our first’s brokers file would look like this (broker.id 1). Keep in mind that those brokers will run on the same docker-compose file. Therefore the zookeeper.connect property contains the internal docker compose dns names. The name of the file would be named server1.properties.
01 02 03 04 05 06 07 08 09 10 11 12 13 14 15 16 17 18 19 20 21 22 | socket.send.buffer.bytes= 102400 delete.topic.enable= true socket.request.max.bytes= 104857600 log.cleaner.enable= true log.retention.check.interval.ms= 300000 log.retention.hours= 168 num.io.threads= 8 broker.id= 1 transaction.state.log.replication.factor= 1 log4j.opts=-Dlog4j.configuration\=file\:/etc/kafka/log4j.properties group.initial.rebalance.delay.ms= 0 log.dirs=/var/lib/kafka auto.create.topics.enable= true offsets.topic.replication.factor= 1 num.network.threads= 3 socket.receive.buffer.bytes= 102400 log.segment.bytes= 1073741824 num.recovery.threads.per.data.dir= 1 num.partitions= 1 transaction.state.log.min.isr= 1 zookeeper.connection.timeout.ms= 6000 zookeeper.connect=zookeeper- 1 \: 2181 ,zookeeper- 2 \: 2181 ,zookeeper- 3 \: 2181 |
The same recipe applies for the broker.id=2 as well as broker.id=3
After creating those three broker configuration files it is time to change our docker-compose configuration.
01 02 03 04 05 06 07 08 09 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 | version: "3.8" services: zookeeper-1: container_name: zookeeper-1 image: zookeeper ports: - "2181:2181" environment: ZOO_MY_ID: "1" ZOO_SERVERS: server.1=0.0.0.0:2888:3888;2181 server.2=zookeeper-2:2888:3888;2181 server.3=zookeeper-3:2888:3888;2181 zookeeper-2: container_name: zookeeper-2 image: zookeeper ports: - "2182:2181" environment: ZOO_MY_ID: "2" ZOO_SERVERS: server.1=zookeeper-1:2888:3888;2181 server.2=0.0.0.0:2888:3888;2181 server.3=zookeeper-3:2888:3888;2181 zookeeper-3: container_name: zookeeper-3 image: zookeeper ports: - "2183:2181" environment: ZOO_MY_ID: "3" ZOO_SERVERS: server.1=zookeeper-1:2888:3888;2181 server.2=0.0.0.0:2888:3888;2181 server.3=zookeeper-3:2888:3888;2181 kafka-1: container_name: kafka-1 image: confluent /kafka ports: - "9092:9092" volumes: - type : bind source : . /server1 .properties target: /etc/kafka/server .properties kafka-2: container_name: kafka-2 image: confluent /kafka ports: - "9093:9092" volumes: - type : bind source : . /server2 .properties target: /etc/kafka/server .properties kafka-3: container_name: kafka-3 image: confluent /kafka ports: - "9094:9092" volumes: - type : bind source : . /server3 .properties target: /etc/kafka/server .properties |
Let’s spin up the docker-compose file.
1 | > docker-compose -f docker-compose.yaml up |
Just like the previous examples we shall run some commands through the containers.
Now that we have a proper cluster with Zookeeper and multiple Kafka brokers it is time to test them working together.
The first action is to create a topic with a replication factor of 3. The expected outcome would be for this topic to be replicated 3 kafka brokers
1 2 | > docker exec -it kafka-1 /bin/bash confluent@92a6d381d0db:/$ kafka-topics --zookeeper zookeeper-1:2181,zookeeper-2:2181,zookeeper-3:2181 --create --topic tutorial-topic --replication-factor 3 --partitions 1 |
Our topic has been created let’s check the description of the topic.
1 2 3 | confluent@92a6d381d0db:/$ kafka-topics --describe --topic tutorial-topic --zookeeper zookeeper-1:2181,zookeeper-2:2181,zookeeper-3:2181 Topic:tutorial-topic PartitionCount:1 ReplicationFactor:3 Configs: Topic: tutorial-topic Partition: 0 Leader: 2 Replicas: 2,1,3 Isr: 2,1,3 |
As we see the Leader for the partition is broker 2
Next step is putting some data to the topic recently created. Before doing so I will add a consumer listening for messages to that topic. While we post messages to the topic those will be printed by this consumer.
1 2 | > docker exec -it kafka-3 /bin/bash confluent@4042774f8802:/$ kafka-console-consumer --topic tutorial-topic --from-beginning --zookeeper zookeeper-1:2181,zookeeper-2:2181,zookeeper-3:2181 |
Let’s add some topic data.
1 2 3 4 5 | > docker exec -it kafka-1 /bin/bash confluent@92a6d381d0db:/$ kafka-console-producer --topic tutorial-topic --broker-list kafka-1:9092,kafka-2:9092 test1 test2 test3 |
As expected the consumer on the other terminal will print the messages expected.
1 2 3 | test1 test2 test3 |
Due to having a cluster it would be nice to stop the leader broker and see another broker to take the leadership. While doing this the expected results will be to have all the messages replicated and no disruption on consuming and publishing the messages.
Stop the leader which is broker-2
1 | > docker stop kafka-2 |
Check the leadership from another broker
1 2 3 | confluent@92a6d381d0db:/$ kafka-topics --describe --topic tutorial-topic --zookeeper zookeeper-1:2181,zookeeper-2:2181,zookeeper-3:2181 Topic:tutorial-topic PartitionCount:1 ReplicationFactor:3 Configs: Topic: tutorial-topic Partition: 0 Leader: 1 Replicas: 2,1,3 Isr: 1,3 |
The leader now is kafka-1
Read the messages to see that they did got replicated.
1 2 3 4 5 | > docker exec -it kafka-3 /bin/bash confluent@4042774f8802:/$ kafka-console-consumer --topic tutorial-topic --from-beginning --zookeeper zookeeper-1:2181,zookeeper-2:2181,zookeeper-3:2181 test1 test2 test3 |
As expected apart from the Leadership being in place our data have also been replicated!
If we try to post new messages, it will also be a successful action.
So to summarise we did run a Kafka cluster connected to a zookeeper ensemble. We did create a topic with replication enabled to 3 brokers and last but not least we did test what happens if one broker goes down.
On the next blog we are going to wrap it up so our local machine clients can connect to the docker compose ensemble.
Published on Java Code Geeks with permission by Emmanouil Gkatziouras, partner at our JCG program. See the original article here: Kafka & Zookeeper for Development: Connecting Brokers to the Ensemble Opinions expressed by Java Code Geeks contributors are their own. |