In this post I will show how to use Docker containers to create and scale
a Kafka cluster, and also how to create, scale and move topics
inside
the cluster.
Repository: https://github.com/jeqo/post-scale-kafka-containers
Single-Node Cluster
First of all, let’s start with the most simple way to run Docker, that could be useful for some development scenarios: Single-Node Cluster
Apache Kafka architecture is based in 2 main components: The Apache Kafka server itself, and the Apache Zookeeper server used for internal coordination.
That’s why a Kafka single-node cluster requires at least a couple of processes.
If we talk in Container terms and practices, these processes should be run in 2 different containers.
The easiest way to do this is defining these processes as
Docker Compose services is a kafka-cluster/docker-compose.yml
file:
I will use a couple of Docker images. These are fairly simple and you can find their source code here: Apache Kafka, Apache Zookeeper, and Confluent Platform
version: "2.1"
services:
kafka:
image: jeqo/apache-kafka:0.10.1.0-2.11
links:
- zookeeper
zookeeper:
image: jeqo/apache-zookeeper:3.4.8
This configuration defines 2 services: kafka
and zookeeper
. The kafka
service link
and environment variable ZOOKEEPER_CONNECT
configure the access
from kafka
to zookeeper
service.
If we try to start these configuration with docker-compose up -d
,
Docker Compose will create a network
where these service can communicate.
jeqo@jeqo-Oryx-Pro:.../single-node-kafka-cluster$ docker-compose up -d
Creating network "kafkacluster_default" with the default driver
Creating kafkacluster_zookeeper_1
Creating kafkacluster_kafka_1
If you want to communicate with the cluster from your application’s docker-compose configuration, you can do it as follows:
version: "2.1"
services:
kafka:
image: jeqo/apache-kafka-client:0.10.1.0-2.11
command: sleep infinity
networks:
- default
- kafkacluster_default #(2)
networks: #(1)
kafkacluster_default:
external: true
Here we define first an external network
called singlenodekafkacluster_default
that will give us access to the kafka cluster network. Then we add this network
to the service network.
To test our client, start it up running docker-compose up -d
and then connect
to the cluster with the following command:
$ docker-compose exec kafka bash
# bin/kafka-console-producer.sh --broker-list kafka:9092 --topic topic1
test
# bin/kafka-topics.sh --zookeeper zookeeper:2181 --list
topic1
Multi-Node Cluster
To scale a container using Docker Compose is as simple as using the scale
command:
docker-compose scale kafka=3
This will create 2 more containers:
$ docker-compose scale kafka=3
Creating and starting kafkacluster_kafka_2 ... done
Creating and starting kafkacluster_kafka_3 ... done
You, as an application developer, only need to know one of the broker
IPs, or use the service
name to connect to the cluster. As the documentation specifies, the client (eg. producer or consumer)
will use it only once to get the Kafka broker
IPs from the same cluster. This means that
Kafka scaling will be transparent to your application.
To validate that all brokers are part of the cluster let’s use Zookeeper client to check. From client container:
$ docker-compose exec kafka bash
# bin/zookeeper-shell.sh zookeeper:2181
ls /brokers/ids
[1003, 1002, 1001]
Scaling Topics
In Kafka, Topics
are distributed in Partitions
. Partitions
allows scalability, enabling Topics
to fit in several nodes, and parallelism, allowing different instances from the same Consumer Group
to
consume messages in parallel.
Apart from this, Kafka manage how this Partitions
are replicated, to achieve high availability. In
this case, if you have many replicas
from one partition
, one will be the leader
and there will
be zero o more followers
spread on different nodes.
How do we configure this using this simple container configuration? Let’s evaluate some scenarios:
Adding new topics to the cluster
Once you scale your cluster, Kafka won’t use these new nodes unless new topics
are created.
Let’s test it following these steps:
- Start a single node cluster
- Then start client, create a topic
topic1
, and describe the topic to check the broker
- Scale your cluster to 3 nodes
- Add topics to occupy other brokers
Using multiple partitions:
Or using a replication factor:
To decide what replication factor
or how many partitions
to use, depends
on your use case. This deserves its own blog post.
Expanding topics in your cluster
Expanding topics in your cluster means move topics
and partitions
once
you have more brokers
in your cluster
, because, as show before,
your new brokers
won’t store any data, once they are created, unless
you create new topics
.
You can do this 3 steps:
-
Identify which topics do you want to move.
-
Generate a candidate reassignment. This could be done automatically, or you can decide how to redistribute your topics.
-
Execute your reassignment plan.
You can do this following the documentation here: http://kafka.apache.org/documentation/#basic_ops_cluster_expansion
The steps described in the documentation are automated a bit with Ansible:
Inside the playbooks/prepare-reassignment.yml
file you have 2 variables:
vars:
topics:
- topic1
broker_list: 1003
This will prepare a recipe to move your topic topic1
to broker
with id 1003
.
You can paste the JSON file generated into playbooks/reassign-topic-plan.json
{
"version":1,
"partitions":[{"topic":"topic1","partition":0,"replicas":[1003]}]
}
And then run this plan with the another playbook: playbooks/execute-reassignment.yml
Confluent Platform images
All these could be done in the same way with Confluent Platform.
There is a couple of directories confluent-cluster
and confluent-client
to test this out:
Hope this post help you to understand Kafka topics
and how containers
can
help you to run clusters in seconds :)
And, you know, run …