170 похожих чатов

Привет всем, буду благодарен за помощь.. нужны варианты что может быть Данные

тестирования:

1) 4 консюмера в разных окнах терминала - использовался консольный кафка-консумер, общая группа, общий топик

2) продюсер из java кода, он ложит в топик сообщения, равномерно по всем 4 партициям топика

Суть проблемы, создаю чистый топик, заливаю сообщения

подключаются мои 4 консюмера с одной группой, тянут корректно равномерно в первый раз сообщения


ок, кладу снова, и тут что-то клинит, не хотят больше сообщения тянуть

конфигов так таковых и нет, все дефолту там, я из кода консумера, просто указываю топик и группу

в продюсере указал только топик в java коде

Консюмеры корректно партиции между собой распределяют

нужны теории что может быть, почему не работает


Это лишь тесты без java, а вот в java что у меня, там история такая же


В коде так считываю сообщения


@KafkaListener(groupId = "mygroup", topics = "mytopic")
public void listenPartition0(ConsumerRecord<?, ?> record) {

HelpMethods.setLog("Received: " + record.value());

}

конфиг консюмера

@EnableKafka
@Configuration
public class KafkaConfiguration {


@Bean
public ConsumerFactory<String, String> consumerFactory() {
Map<String, Object> config = new HashMap<>();

config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "x.x.x.x.x:9092");
config.put(ConsumerConfig.GROUP_ID_CONFIG, "tasks");
config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);

return new DefaultKafkaConsumerFactory<>(config);
}

@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory();
factory.setConsumerFactory(consumerFactory());
return factory;
}

}



конфиг продюсера


@Configuration
public class KafkaProducerConfig {


@Bean(name = "kafkaTransactionManager")
@ConditionalOnMissingBean(KafkaTransactionManager.class)
public KafkaTransactionManager transactionManager(ProducerFactory producerFactory) {
return new KafkaTransactionManager(producerFactory);
}

@Bean
public ProducerFactory<?, ?> producerFactory() {
Map<String, Object> configProps = new HashMap<>();
configProps.put(
ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "x.x.x.x.x:9092");
configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
DefaultKafkaProducerFactory<Object, Object> producerFactory =
new DefaultKafkaProducerFactory<>(configProps);

String transactionId = UUID.randomUUID().toString();
producerFactory.setTransactionIdPrefix(transactionId);
return producerFactory;
}

@Bean
public KafkaTemplate<?, ?> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}

}



Пишу продюсером так

kafkaTemplate.send(taskTopic, new TaskInstanceKafka(instanceId, taskId, currentTime));

Есть какие-то очевидные ошибки в моем коде?

5 ответов

19 просмотров

Залей это куда нибудь, и дай ссылку

А шо по логам? По кафке обычно можно увидеть много логов. И в момент, когда приложение перестает вычитывать, можно увидеть какие-то перераспределения между партициями/ и разные ошибки подключения

Vyacheslark
А шо по логам? По кафке обычно можно увидеть много...

ощущение, что что-то падает в какой-то момент и потом консумеры не могут прийти в себя, нормально перераспределиться. так шо я бы смотрел логи

Похожие вопросы

Обсуждают сегодня

Ну вот просто даже давайте вот как. Какой нибудь конкретный кейс, можете в пример привести, где бч работает и приносит прикладную пользу, а не просто что бы было? Не крипту.
Alexander Andreev
22
Всем привет! Имеется функция: function IsValidChar(ch: UTF8Char): Boolean; var i: Integer; ValidChars: AnsiString; begin ValidChars := 'abcdefghijklmnopqrstuvwxyzABCDE...
Евгений
44
объясните пожалуйста, почему функция не работает должным образом? вроде должно брать активное окно сравнивать его размер с размером экрана, и если есть совпадение = true прове...
JF
7
> Копаем глубже > Следующий момент был, когда я спросил его, знает ли он JavaScript. Он ответил, что его учили работать с C#. Я тоже в университете писал на C#, но даже там мн...
Oleg Volkov
4
лучше скажите, причём тут паскаль?
Alexey Kulakov
36
И никого не интересует какие пакеты кто использует. ((% Заходишь на сайт симфони и видишь поддержку Украины - по законам РФ это ж экстремизм. Только никто не отказывается от с...
Am Ambrion
11
Чтобы перехватить все нажимания буков на форме, надо хук ставить? Пробовал на форме ОнКейДаун, оно ловит клаву если фокус не на компоненте с вводом текста
Serjone
15
Народ! Впервые клиенту пришло письмо от РКН, у вас, дескать, есть яндекс метрика, а нигде не написано, что вы ее юзаете. Никто не сталкивался?
Sasha Beep
14
Но, может, есть уже проверенная? Наши требования такие: 1. Сообщения должны приходить из Инста в CRM оду 2. Должна быть возможность подключить несколько экаунтов Инстаграм. Р...
Alexander Sharoiko MSE / Александр Шаройко
7
Всем привет! вывожу на общей стр дочерние ресурсыв каждом ресурсе галерея, и первая фотка должна выводиться на общей [!DocLister? &prepare=photo !]
Alekso
12
Карта сайта