En este post mostraré como utilizar contenedores Docker para crear y escalar
un clúster de Kafka, y también como crear, escalar y mover topics
dentro del
clúster.
Repositorio: https://github.com/jeqo/post-scale-kafka-containers
Clúster de un nodo
Primero, comenzaremos con la forma más sencilla de utilizar Docker, que puede ser útil y suficiente para algunos escenarios de desarrollo: un clúster con un nodo
La arquitectura de Apache Kafka esta basada en 2 components principales: El propio servidor de Apache Kakfa, y el servidor de Apache Zookeeper, utilizado por Kafka para su coordinación interna.
Es por eso que un clúster de nodo simple require por lo menos de un par de procesos.
Si hablamos en terminos y prácticas de contenedores
, estos processos deberían
ejecutarse en dos contenedores diferentes.
La forma más sencilla de definir estos procesos en Docker, es con
servicios de Docker Compose
, como están definidos en el archivo
kafka-cluster/docker-compose.yml
Usaré un par de imagenes. Son bastante simples, y el código fuente se encuentra aquí: Apache Kafka, Apache Zookeeper, and Confluent Platform
version: "2.1"
services:
kafka:
image: jeqo/apache-kafka:0.10.1.0-2.11
links:
- zookeeper
zookeeper:
image: jeqo/apache-zookeeper:3.4.8
Esta configuración define 2 servicios: kafka
y zookeeper
. El link
del
servicio kafka
y su variable de entorno ZOOKEEPER_CONNECT
configuran el acceso desde kafka
hacia el servicio zookeeper
.
Si probamos iniciar los servicios con el comando docker-compose up -d
,
Docker Compose creará una red donde estos servicios se podrán comunicar.
jeqo@jeqo-Oryx-Pro:.../single-node-kafka-cluster$ docker-compose up -d
Creating network "kafkacluster_default" with the default driver
Creating kafkacluster_zookeeper_1
Creating kafkacluster_kafka_1
Si queremos acceder a estos servicios desde nuestra aplicación (también definida en Docker Compose) lo podemos hacer de la siguiente manera:
version: "2.1"
services:
kafka:
image: jeqo/apache-kafka-client:0.10.1.0-2.11
command: sleep infinity
networks:
- default
- kafkacluster_default #(2)
networks: #(1)
kafkacluster_default:
external: true
Aquí definimos primero una red externa external network
llamada singlenodekafkacluster_default
que nos permite acceder a la red del clúster de kafka.
Luego agregamos esta red a los servicios que requieren acceso, en este caso
el servicio client
.
Para probar el acceso desde el cliente, primero iniciemos el servicio con
docker-compose up -d
y luego nos conectamos al servicio:
$ docker-compose exec kafka bash
# bin/kafka-console-producer.sh --broker-list kafka:9092 --topic topic1
test
# bin/kafka-topics.sh --zookeeper zookeeper:2181 --list
topic1
Clúster Multi-Nodo
Una vez creado nuestro clúster, escalar nuestro contenedor de ´kafka´
es tan sencillo como utilizar el comando scale
:
docker-compose scale kafka=3
Este comando creará dos contenedores adicionales:
$ docker-compose scale kafka=3
Creating and starting kafkacluster_kafka_2 ... done
Creating and starting kafkacluster_kafka_3 ... done
Para nosotros, como desarrolladores(as) de aplicaciones, solo necesitamos
saber uno de los host o IPs de los broker
(nodo del clúster de Kafka),
para conectarnos al clúster. O también podemos usar el nombre del servicio.
Como la documentación especifíca, el cliente (p.ejem: productor
o consumidor
)
solo utilizará este dato para iniciar la conexión y obtener la lista completa de
brokers
del clúster. Esto significa que la escalabilidad de Kafka es
transparente para nuestra aplicación.
Para validar que todos los brokers son parte del clúster, usaremos el client de Zookeeper.
Desde el contenedor cliente:
$ docker-compose exec kafka bash
# bin/zookeeper-shell.sh zookeeper:2181
ls /brokers/ids
[1003, 1002, 1001]
Escalando Topics
En Kafka, los Topics
son distribuidos en Partitions
. Las Particiones
permiten la escalabilidad, haciendo posible que los Topics
quepan en
varios nodos; y paralelismo, dejando que distintas instancias de un mismo
Grupo de Consumidores puedan consumir messages en paralelo.
Aparte de este beneficio, Kafka tiene la habilidad de replicar estas
Particiones
, logrando alta disponibilidad. En este case, si tienes varias replicas
de una partición
, una será la partición líder
y las demás replicas serán
seguidoras
.
Agregando nuevos Topics
al clúster
Una vez que el clúster tiene mayor número de nodos, Kafka no utilizará estos nuevos nodos hasta que nuevos tópicos sean creados.
Veamos como probamos esto:
- Iniciemos un clúster simple, con un solo nodo
- Luego iniciemos un cliente, y creemos un topic
topic1
- Escalemos el clúster a 3 nodos
- Agregemos topics para ocupar los demás brokers
Usando múltiples particiones:
O usando varias réplicas:
Para decidir que factor de replicación
utilizar o cuantas particiones
,
depende de cada caso de uso. Estos temas merecen su propio post.
Expandiendo Topics
en el clúster
Expandir topics en el clúster significa mover topics
y particiones
una vez que se tengan más brokers
en el clúster
.
Esto se puede realizar en 3 pasos:
-
Identificar que
topics
se quieren mover a un nuevobroker
. -
Generar el plan de reasignación. Esto se puede realizar de forma automática o manual, si se sabe cómo redistribuir los topics.
-
Ejecutar el plan de reasignación.
Estos pasos se encuentran documentados aquí: http://kafka.apache.org/documentation/#basic_ops_cluster_expansion
He automatizado un poco los pasos con unos script en Ansible:
Dentro del archivo playbooks/prepare-reassignment.yml
hay dos variables a definir:
vars:
topics:
- topic1
broker_list: 1003
Estas prepararán un plan para mover el topic topic1
al broker
con id 1003
.
Puedes copiar ese JSON generado en playbooks/reassign-topic-plan.json
{
"version":1,
"partitions":[{"topic":"topic1","partition":0,"replicas":[1003]}]
}
Y ejecutar el otro playbook: playbooks/execute-reassignment.yml
Confluent Platform images
Todos estos pasos se pueden ejecutar igualmente con Confluent Platform.
Para ello, agregué los directorios confluent-cluster
y confluent-client
para
poder probarlo:
Espero que este post los ayude a entender un poco más sobre los topics
en
Kafka y como los contenedores
nos pueden ayudar a crear clústers en segundos :)
Y, ya saben, ejecuten …