170 похожих чатов

Привет всем, буду благодарен за помощь.. нужны варианты что может быть Данные

тестирования:

1) 4 консюмера в разных окнах терминала - использовался консольный кафка-консумер, общая группа, общий топик

2) продюсер из java кода, он ложит в топик сообщения, равномерно по всем 4 партициям топика

Суть проблемы, создаю чистый топик, заливаю сообщения

подключаются мои 4 консюмера с одной группой, тянут корректно равномерно в первый раз сообщения


ок, кладу снова, и тут что-то клинит, не хотят больше сообщения тянуть

конфигов так таковых и нет, все дефолту там, я из кода консумера, просто указываю топик и группу

в продюсере указал только топик в java коде

Консюмеры корректно партиции между собой распределяют

нужны теории что может быть, почему не работает


Это лишь тесты без java, а вот в java что у меня, там история такая же


В коде так считываю сообщения


@KafkaListener(groupId = "mygroup", topics = "mytopic")
public void listenPartition0(ConsumerRecord<?, ?> record) {

HelpMethods.setLog("Received: " + record.value());

}

конфиг консюмера

@EnableKafka
@Configuration
public class KafkaConfiguration {


@Bean
public ConsumerFactory<String, String> consumerFactory() {
Map<String, Object> config = new HashMap<>();

config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "x.x.x.x.x:9092");
config.put(ConsumerConfig.GROUP_ID_CONFIG, "tasks");
config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);

return new DefaultKafkaConsumerFactory<>(config);
}

@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory();
factory.setConsumerFactory(consumerFactory());
return factory;
}

}



конфиг продюсера


@Configuration
public class KafkaProducerConfig {


@Bean(name = "kafkaTransactionManager")
@ConditionalOnMissingBean(KafkaTransactionManager.class)
public KafkaTransactionManager transactionManager(ProducerFactory producerFactory) {
return new KafkaTransactionManager(producerFactory);
}

@Bean
public ProducerFactory<?, ?> producerFactory() {
Map<String, Object> configProps = new HashMap<>();
configProps.put(
ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "x.x.x.x.x:9092");
configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
DefaultKafkaProducerFactory<Object, Object> producerFactory =
new DefaultKafkaProducerFactory<>(configProps);

String transactionId = UUID.randomUUID().toString();
producerFactory.setTransactionIdPrefix(transactionId);
return producerFactory;
}

@Bean
public KafkaTemplate<?, ?> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}

}



Пишу продюсером так

kafkaTemplate.send(taskTopic, new TaskInstanceKafka(instanceId, taskId, currentTime));

Есть какие-то очевидные ошибки в моем коде?

5 ответов

8 просмотров

Залей это куда нибудь, и дай ссылку

А шо по логам? По кафке обычно можно увидеть много логов. И в момент, когда приложение перестает вычитывать, можно увидеть какие-то перераспределения между партициями/ и разные ошибки подключения

Vyacheslark
А шо по логам? По кафке обычно можно увидеть много...

ощущение, что что-то падает в какой-то момент и потом консумеры не могут прийти в себя, нормально перераспределиться. так шо я бы смотрел логи

Похожие вопросы

Обсуждают сегодня

Всем привет, написал код ниже, но он выдает сегфолт, в чем причина? #include <stdio.h> #include <stdlib.h> #include <string.h> struct product { char *name; float price; };...
buzz базз
75
База данных не поможет. Шифрование не поможет. Какие там ещё варианты? Накидывайте.
КТ315
20
А как лучше конвертировать физический адрес в виртуальный при маппинге? В случае ядра у меня, например, direct mapping, первые 768МБ я как есть мапплю в higher half, а остальн...
Evg Resh
26
А табстоп это сообщение от окна или от элемента управления?
The Bird of Hermes
18
Открыл свой двухкилобайтный экзешник в x32dbg, а тут какая-то хрень. Смущает кнопка "выполнить до пользовательского кода", а что ещё может быть в файле помимо него ?
НѣкъиⰘижєжєиꙁъвьсєсвѣтьноѣсѣтиѥсть•
11
Вопрос тем кто смотрит видео и слушает подкасты - как вы потом ищете нужную вам информацию? Вот статью я прочитал, потом могу искать нужную мне часть банальным поиском. Пропус...
Aleksandr Druzhinin
4
Мне были интересны дишные хаки и я нашёл любопытный способ на форуме через __traits, что-то вроде int delegate(int) fac = (int n) => n == 0 ? 1 : n * __traits(parent, {})(n - ...
Constantin F.
1
Всем привет, подскажите/посоветуйте пожалуйста. Фаердак компоненты, имею одно место где бизнес хочет видеть при открытии формы список всех клиентов, это порядка 30к. Мои дово...
Sasha Sch
14
Ребят, если кто в курсе - скажите, а в загранке такое же засилье маркетплейсов? или там простые сермяжные интернет-магазины живут попроще?
Андрей [aharito] Харитонов
14
@FAssembler ты много с формами работал, как цикл обработки сообщений от окошек надо делать, чтобы IsDialogMessage не ломал ввод в эдиты и навигация по табам работала?
The Bird of Hermes
8
Карта сайта