Una de las características más importantes de Apache Kafka es el manejo
de múltiples consumidores. Cada consumer group
tiene un offset
, que
determina hasta que punto del topic
se encuentra consumido por consumer group
.
Así, cada consumer group
puede manejar los offset
independientemente, por
partición.
Esto ofrece la posibilidad de retroceder en el tiempo y reprocesar mensaje desde
el inicio de un topic
y regenerar el estado actual del sistema.
Pero, cómo realizar esto de forma programática?
Código fuente: https://github.com/jeqo/post-kafka-rewind-consumer-offset
Conceptos Básicos
Topics y Offsets
Lo primero por entender para lograr retroceder los consumidores en Kafka es:
retroceder sobre qué? Cada topic
esta dividido en partitions
. Los
registros enviados por los Producers
son balanceados entre las partitions
,
así cada partición tiene su propio índice de offsets
.
Cada record
tiene un offset
asignado que será usado por los consumers
para definir qué mensajes han sido consumidos del log.
Consumers y Consumer Groups
Una vez entendido que los topics
tienen partitions
y offsets
por partition
podemos pasar a definir como trabajan los consumers
y consumer groups
.
Consumers
están agrupados por group.id
. Ésta propiedad identifica a cada
consumer group
, así el broker
conoce cúal fue el último record
consumido
por offset
, por partition
.
Antes de continuar, revisemos la clase que funcionará como un Kafka Consumer
simple implementado en Java:
KafkaSimpleProducer.java
:
public static void main(String[] args) {
...
Properties properties = new Properties();
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
Producer<String, String> producer = new KafkaProducer<>(properties);
IntStream.rangeClosed(1, 100)
.boxed()
.map(number -> new ProducerRecord<>(
"topic-1",
number.toString(),
number.toString()))
.map(record -> producer.send(record))
.forEach(result -> printMetadata(result));
producer.close();
}
Este producer
creará 100 records
en el topic topic-1
, con offsets
de 0
a 99
.
Desde Línea de Comandos
En este primer escenario revisaremos como manejar los offsets de consumers
desde línea de comandos, para tener una idea de como implementarlo en
nuestra aplicación.
Cuando trabajas desde un terminal, si se puede utilizar kafka-console-consumer
sin group.id
definido, un nuevo group.id
es generado internamente:
console-consumer-${new Random().nextInt(100000)}
.
Así que a menos que se utilize el mismo group.id
luego, será como si
creara un nuevo consumer group
cada vez que se inice un terminal con
kafka-console-consumer
.
Por defecto, cuando se conecta a un topic
como un consumer
se inicia con
el último offset
, así que no se recibirán nuevos records
a menos que nuevos
mensajes arriben luego de iniciada la conexión.
En este caso, para ir hacia el inicio del topic sería suficiente con agregar
la opción --from-beginning
a la línea de comandos:
Pero, qué pasaría si se usa la propiedad group.id
?, La ópcion --from-beginning
solo funcionaría la primera vez, ya que el offset
sería registrado en el clúster::
Así que, cómo se regresaría al inicio del log en este caso?
Podemos usar la opción --offset
con estas tres alternativas:
--offset <String: consume offset> The offset id to consume from (a non-
negative number), or 'earliest'
which means from beginning, or
'latest' which means from end
(default: latest)
Desde Clientes Java
Ahora, luego de ver que desde la línea de comandos en sencillo regresar en el tiempo sobre el log; pero, cómo hacer éstas operaciones desde una aplicación?
Si estás utilizando Kafka Consumers en tu aplicación, tienes las siguientes opciones (con Java):
Haciendo la historia corta: Si necesitas capacidades de procesar mensajes
desde Kafka de forma stateful (manteniendo el estado), es recomendable
utilizar Kafka Streams API
.
Si necesitas una API simple para consumir mensajes uno a uno, utiliza
Kafka Consumer API
.
Al momento de escribir este post, éstas son las opciones para hacer rewind
de offsets
desde estas APIs:
-
Kafka Consumer API
soporta regresar al inicio de topic, ir a unoffset
específico, o regresar a un punto en el tiempo (timestamp). -
Kafka Streams API
en este momemnto solo soporta regresar aloffset
inicial de losinput topics
, y se encuentra bien explicado por Matthias J. Sax en su post: [1].
Así que me enfocaré en las opciones disponibles desde el API de Kafka Consumer
.
Un consumidor simple luciría así:
public static void main(String[] args) {
Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ConsumerConfig.GROUP_ID_CONFIG, "test");
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList("topic-1"));
while (true) {
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records)
System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
}
}
Este consumidor buscará por records por 100ms
y los imprimirá en la consola.
Ahora veamos como regresar offsets
en distintos escenarios. Consumer API
tiene operaciones #seek
que permiten estas funcionalidades. Mostraré una
forma sencilla de agregar estas operaciones utilizando flags
:
Regresar al offset
inicial
La opción más común es regresar al inicio del topic
, que no siempre será
offset=0
. Esto dependerá de la política de retention
que removerá los
records
antiguos por tiempo o por tamaño del topic
; pero este tema
merece su propio post.
Para ir al inicio de topic
usaremos la operación #seekToBeginning(topicPartition)
:
boolean flag = true;
while (true) {
ConsumerRecords<String, String> records = consumer.poll(100);
if (flag) {
consumer.seekToBeginning(
Stream.of(new TopicPartition("topic-1", 0)).collect(toList()));
flag = false;
}
for (ConsumerRecord<String, String> record : records)
//Consume record
}
Una vez realizada la búsqueda del offset
inicial, para el topic=topic-1
en la partition=0
se reprocesarán los records
nuevamente.
Regresar a un offset
específico
Si podemos reconocer los records
específicos (por partition
) a los que
necesitamos regresar para reprocesar el log, podemos utilizar
#seek(topicPartition, offset)
directamente.
boolean flag = true;
while (true) {
ConsumerRecords<String, String> records = consumer.poll(100);
if(flag) {
consumer.seek(new TopicPartition("topic-1", 0), 90);
flag = false;
}
for (ConsumerRecord<String, String> record : records)
System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
}
En este casoo, retrocederemos en en topic=topic-1
partition=0
hasta el record
con offset=90
y reprocesaremos los siguiente records
del log.
NOTA: Puede resultar engorroso mapear todos los offsets por partición cuando
tienes varias particiones. Por esto es que la adición de timestamps
ayuda
a resolver este tema.
Regrasar a un offset
por timestamp
Si no conoces exactamente el offset id
del record
desde donde necesitar
reprocesar el log, pero sabes si necesitas regresar 1 hora o 10 minutos atrás
el nuevo índice de timestamp
puede ser útil.
Desde el release 0.10.1.0
, hay un par de mejoras
[2]
[3]
que fueron implementadas, y una nueva operación fue agregada al
Kafka Consumer API
: #offsetsForTimes
:
boolean flag = true;
while (true) {
ConsumerRecords<String, String> records = consumer.poll(100);
if(flag) {
Map<TopicPartition, Long> query = new HashMap<>();
query.put(
new TopicPartition("simple-topic-1", 0),
Instant.now().minus(10, MINUTES).toEpochMilli());
Map<TopicPartition, OffsetAndTimestamp> result = consumer.offsetsForTimes(query);
result.entrySet()
.stream()
.forEach(entry -> consumer.seek(entry.getKey(), entry.getValue().offset()));
flag = false;
}
for (ConsumerRecord<String, String> record : records)
System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
}
En este caso primero estamos consultando cuál es el offset
al que tengo que
regresar si quiero reprocesar los records
de hacer 10 minutos,
y luego con el offset
adecuado, utilizamos la operación #seek
.
En el código fuente se ha agregado los pasos para buscar las particiones por
topic
. Esto permitirá reproducir estos pasos en escenarios en los que tengamos
más de una partición por topic
.
Referencias
-
https://www.confluent.io/blog/data-reprocessing-with-kafka-streams-resetting-a-streams-application/
-
https://cwiki.apache.org/confluence/display/KAFKA/KIP-32+-+Add+timestamps+to+Kafka+message
-
https://cwiki.apache.org/confluence/display/KAFKA/KIP-33+-+Add+a+time+based+log+index
-
https://cwiki.apache.org/confluence/display/KAFKA/FAQ#FAQ-HowcanIrewindtheoffsetintheconsumer