Kafka Streams abstracts access to multiple tasks state stores when reading

Posted on
til kafka-streams api dev

Kafka Streams applications could scale either horizontally (add more instances) or vertically (add more threads). When scaled vertically, multiple tasks store multiple partitions locally. An interesting question is whether Kafka Streams gives access when reading (i.e. Interactive Queries) to these stores, and how does it manage to abstract the access to different stores managed by multiple tasks.

The answer is yes, Kafka Streams abstracts away tasks and multiple stores. Internally it’s implemented by using CompositeReadOnly*Stores:

Where StoreProvider gives access to the internal stores managed by the Kafka Streams instance tasks:

final List<ReadOnlySessionStore<K, V>> stores = storeProvider.stores(storeName, queryableStoreType);
for (final ReadOnlySessionStore<K, V> store : stores) {
    try {
        final KeyValueIterator<Windowed<K>, V> result =
            store.findSessions(key, earliestSessionEndTime, latestSessionStartTime);

        if (!result.hasNext()) {
            result.close();
        } else {
            return result;
        }
    } catch (final InvalidStateStoreException ise) {
        throw new InvalidStateStoreException(
            "State store  [" + storeName + "] is not available anymore" +
                " and may have been migrated to another instance; " +
                "please re-discover its location from the state metadata.",
            ise
        );
    }
}
return KeyValueIterators.emptyIterator();

I know about the abstraction, but was interesting to find out how it’s actually implemented.