значения в local state (+ каждый local state сохраняет свое состояние в changelog топик). У всех топиков 3 партиции. Ключи разные, но дистрибуция по ключам одна и та же (используется кастомный партишенер при отправке в исходные топики).
Суть топологии: читать данные из 3-х топиков, складывать значения в local-state, потом с помощью queryable state store реализовать определенную логику джоина данных из 3-х источников.
var properties = new Properties();
properties.putAll(kafkaClusterProperties);
properties.put(StreamsConfig.APPLICATION_ID_CONFIG, applicationId);
builder
.table(topic1)
.transformValues(() -> createStoreProducer(store1), store1)
.toStream().foreach((key, value) -> {
});
builder
.table(topic2)
.transformValues(() -> createStoreProducer(store2), store2)
.toStream().foreach((key, value) -> {
});
builder
.table(topic3)
.transformValues(() -> createStoreProducer(store3), store3)
.toStream().foreach((key, value) -> {
});
var topology = builder.build();
kafkaStreams = new KafkaStreams(topology, properties);
В чем проблема: при запуске 2-х инстансов распределение по партициям происходит совсем не так, как ожидалось:
- У первого инстанса назначились следующие партиции: [topic-one-2, topic-one-0, topic-two-1, topic-three-1]
- У второго: [topic-two-0, topic-one-1, topic-two-2, topic-three-0, topic-three-2]
Ожидалось, что будет так (или иное распределение, но такое, что одни партиции с одинаковыми номерами будут на одном из инстансов):
- instance 1: [topic-one-2, topic-one-0, topic-two-2, topic-two-0, topic-2, topic-three-2, topic-three-0]
- instance 2: [topic-one-1, topic-two-1, topic-three-1]
В чем может быть проблема? Возможно, у меня неправильное представление о kafka-streams, но всегда казалось, что стримы по умолчанию будут стремиться к co-partitioning.
Версия сримов: 2.6.3
Решилось доработкой StreamsPartitionAssignor
Обсуждают сегодня