тестирования:
1) 4 консюмера в разных окнах терминала - использовался консольный кафка-консумер, общая группа, общий топик
2) продюсер из java кода, он ложит в топик сообщения, равномерно по всем 4 партициям топика
Суть проблемы, создаю чистый топик, заливаю сообщения
подключаются мои 4 консюмера с одной группой, тянут корректно равномерно в первый раз сообщения
ок, кладу снова, и тут что-то клинит, не хотят больше сообщения тянуть
конфигов так таковых и нет, все дефолту там, я из кода консумера, просто указываю топик и группу
в продюсере указал только топик в java коде
Консюмеры корректно партиции между собой распределяют
нужны теории что может быть, почему не работает
Это лишь тесты без java, а вот в java что у меня, там история такая же
В коде так считываю сообщения
@KafkaListener(groupId = "mygroup", topics = "mytopic")
public void listenPartition0(ConsumerRecord<?, ?> record) {
HelpMethods.setLog("Received: " + record.value());
}
конфиг консюмера
@EnableKafka
@Configuration
public class KafkaConfiguration {
@Bean
public ConsumerFactory<String, String> consumerFactory() {
Map<String, Object> config = new HashMap<>();
config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "x.x.x.x.x:9092");
config.put(ConsumerConfig.GROUP_ID_CONFIG, "tasks");
config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
return new DefaultKafkaConsumerFactory<>(config);
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory();
factory.setConsumerFactory(consumerFactory());
return factory;
}
}
конфиг продюсера
@Configuration
public class KafkaProducerConfig {
@Bean(name = "kafkaTransactionManager")
@ConditionalOnMissingBean(KafkaTransactionManager.class)
public KafkaTransactionManager transactionManager(ProducerFactory producerFactory) {
return new KafkaTransactionManager(producerFactory);
}
@Bean
public ProducerFactory<?, ?> producerFactory() {
Map<String, Object> configProps = new HashMap<>();
configProps.put(
ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "x.x.x.x.x:9092");
configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
DefaultKafkaProducerFactory<Object, Object> producerFactory =
new DefaultKafkaProducerFactory<>(configProps);
String transactionId = UUID.randomUUID().toString();
producerFactory.setTransactionIdPrefix(transactionId);
return producerFactory;
}
@Bean
public KafkaTemplate<?, ?> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}
}
Пишу продюсером так
kafkaTemplate.send(taskTopic, new TaskInstanceKafka(instanceId, taskId, currentTime));
Есть какие-то очевидные ошибки в моем коде?
Залей это куда нибудь, и дай ссылку
А шо по логам? По кафке обычно можно увидеть много логов. И в момент, когда приложение перестает вычитывать, можно увидеть какие-то перераспределения между партициями/ и разные ошибки подключения
я пока с кафкой на ВЫ
ощущение, что что-то падает в какой-то момент и потом консумеры не могут прийти в себя, нормально перераспределиться. так шо я бы смотрел логи
Обсуждают сегодня