значения в local state (+ каждый local state сохраняет свое состояние в changelog топик). У всех топиков 3 партиции. Ключи разные, но дистрибуция по ключам одна и та же (используется кастомный партишенер при отправке в исходные топики).
                  
                  
                  
                  
                  
                  Суть топологии: читать данные из 3-х топиков, складывать значения в local-state, потом с помощью queryable state store реализовать определенную логику джоина данных из 3-х источников.
                  
                  
                  
                  
                  
                  var properties = new Properties();
                  
                  
                  properties.putAll(kafkaClusterProperties);
                  
                  
                  properties.put(StreamsConfig.APPLICATION_ID_CONFIG, applicationId);
                  
                  
                  builder
                  
                  
                          .table(topic1)
                  
                  
                          .transformValues(() -> createStoreProducer(store1), store1)
                  
                  
                          .toStream().foreach((key, value) -> {
                  
                  
                          });
                  
                  
                  
                  
                  
                  builder
                  
                  
                          .table(topic2)
                  
                  
                          .transformValues(() -> createStoreProducer(store2), store2)
                  
                  
                          .toStream().foreach((key, value) -> {
                  
                  
                          });
                  
                  
                  
                  
                  
                  builder
                  
                  
                          .table(topic3)
                  
                  
                          .transformValues(() -> createStoreProducer(store3), store3)
                  
                  
                          .toStream().foreach((key, value) -> {
                  
                  
                          });
                  
                  
                  
                  
                  
                  var topology = builder.build();
                  
                  
                  kafkaStreams = new KafkaStreams(topology, properties);
                  
                  
                  
                  
                  
                  В чем проблема: при запуске 2-х инстансов распределение по партициям происходит совсем не так, как ожидалось:
                  
                  
                  - У первого инстанса назначились следующие партиции: [topic-one-2, topic-one-0, topic-two-1, topic-three-1]
                  
                  
                  - У второго: [topic-two-0, topic-one-1, topic-two-2, topic-three-0, topic-three-2]
                  
                  
                  
                  
                  
                  Ожидалось, что будет так (или иное распределение, но такое, что одни партиции с одинаковыми номерами будут на одном из инстансов):
                  
                  
                  - instance 1: [topic-one-2, topic-one-0, topic-two-2, topic-two-0, topic-2, topic-three-2, topic-three-0]
                  
                  
                  - instance 2: [topic-one-1, topic-two-1, topic-three-1]
                  
                  
                  
                  
                  
                  В чем может быть проблема? Возможно, у меня неправильное представление о kafka-streams, но всегда казалось, что стримы по умолчанию будут стремиться к co-partitioning.
                  
                  
                  
                  
                  
                  Версия сримов: 2.6.3
                  
                  
                
 Nikita
                          Ryanov
                        
                      
                    
                    
                    
                    
                      Автор вопроса
                      
                      
                        
                          Nikita
                          Ryanov
                        
                      
                    
                    
                    
                    
                      Автор вопроса
                    
                    
                  Решилось доработкой StreamsPartitionAssignor
Обсуждают сегодня