Добрый день. Получаю сообщения из Kafka и если их долго обрабатываю,

то приложение на коммите зависает.
Вот часть кода:
while (true) {
records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
Логика… // Если тут долгое выполнение, приложение зависает на коммите
}
log.debug("Начинаю коммит");
consumer.commitSync();
log.debug("Закончил коммит");
}
В логах вижу только Начинаю коммит, строчка о том, что коммит завершен не выводится.
Как исправить данную проблему?

5 ответов

49 просмотров

(просто предположение) По симптомам похоже, что нужно увеличить max.poll.interval до разумных пределов (>>> времени обработки) или через pause()/resume() обрабатывать сообщения, делая холостые poll(...). Скорее всего, где-то в логах должны быть сообщения о том, что группа заребалансилась и коммит не может быть сделан

Maksim-Batsiuk 💬 Автор вопроса
Nikita Ryanov
(просто предположение) По симптомам похоже, что ну...

2021-07-26 15:25:52 [DEBUG][1-thread-1] - Начинаю коммит 2021-07-26 15:25:52 [DEBUG][ sdlogging] - [Consumer clientId=test_1, groupId=sdlogging] Sending FindCoordinator request to broker 10.56.107.64:9092 (id: 6 rack: null) 2021-07-26 15:25:52 [DEBUG][ sdlogging] - [Consumer clientId=test_1, groupId=sdlogging] Sending FIND_COORDINATOR request with header RequestHeader(apiKey=FIND_COORDINATOR, apiVersion=3, clientId=test_1, correlationId=27) and timeout 30000 to node 6: {key=sdlogging,key_type=0,_tagged_fields={}} 2021-07-26 15:25:52 [DEBUG][1-thread-1] - [Consumer clientId=test_1, groupId=sdlogging] Received FIND_COORDINATOR response from node 6 for request with header RequestHeader(apiKey=FIND_COORDINATOR, apiVersion=3, clientId=test_1, correlationId=27): FindCoordinatorResponseData(throttleTimeMs=0, errorCode=0, errorMessage='NONE', nodeId=1, host='10.56.107.41', port=9092) 2021-07-26 15:25:52 [DEBUG][1-thread-1] - [Consumer clientId=test_1, groupId=sdlogging] Received FindCoordinator response ClientResponse(receivedTimeMs=1627302352285, latencyMs=2, disconnected=false, requestHeader=RequestHeader(apiKey=FIND_COORDINATOR, apiVersion=3, clientId=test_1, correlationId=27), responseBody=FindCoordinatorResponseData(throttleTimeMs=0, errorCode=0, errorMessage='NONE', nodeId=1, host='10.56.107.41', port=9092)) 2021-07-26 15:25:52 [INFO ][1-thread-1] - [Consumer clientId=test_1, groupId=sdlogging] Discovered group coordinator 10.56.107.41:9092 (id: 2147483646 rack: null) 2021-07-26 15:25:52 [INFO ][1-thread-1] - [Consumer clientId=test_1, groupId=sdlogging] Group coordinator 10.56.107.41:9092 (id: 2147483646 rack: null) is unavailable or invalid, will attempt rediscovery 2021-07-26 15:25:52 [DEBUG][1-thread-1] - [Consumer clientId=test_1, groupId=sdlogging] Sending FindCoordinator request to broker 10.56.107.66:9092 (id: 5 rack: null) 2021-07-26 15:25:52 [DEBUG][1-thread-1] - [Consumer clientId=test_1, groupId=sdlogging] Sending FIND_COORDINATOR request with header RequestHeader(apiKey=FIND_COORDINATOR, apiVersion=3, clientId=test_1, correlationId=28) and timeout 30000 to node 5: {key=sdlogging,key_type=0,_tagged_fields={}} 2021-07-26 15:25:52 [DEBUG][1-thread-1] - [Consumer clientId=test_1, groupId=sdlogging] Received FIND_COORDINATOR response from node 5 for request with header RequestHeader(apiKey=FIND_COORDINATOR, apiVersion=3, clientId=test_1, correlationId=28): FindCoordinatorResponseData(throttleTimeMs=0, errorCode=0, errorMessage='NONE', nodeId=1, host='10.56.107.41', port=9092) 2021-07-26 15:25:52 [DEBUG][1-thread-1] - [Consumer clientId=test_1, groupId=sdlogging] Received FindCoordinator response ClientResponse(receivedTimeMs=1627302352388, latencyMs=2, disconnected=false, requestHeader=RequestHeader(apiKey=FIND_COORDINATOR, apiVersion=3, clientId=test_1, correlationId=28), responseBody=FindCoordinatorResponseData(throttleTimeMs=0, errorCode=0, errorMessage='NONE', nodeId=1, host='10.56.107.41', port=9092)) 2021-07-26 15:25:52 [INFO ][1-thread-1] - [Consumer clientId=test_1, groupId=sdlogging] Discovered group coordinator 10.56.107.41:9092 (id: 2147483646 rack: null) 2021-07-26 15:25:52 [DEBUG][1-thread-1] - [Consumer clientId=test_1, groupId=sdlogging] Initiating connection to node 10.56.107.41:9092 (id: 2147483646 rack: null) using address /10.56.107.41 2021-07-26 15:25:52 [INFO ][ sdlogging] - [Consumer clientId=test_1, groupId=sdlogging] Member test_1-850de298-a8af-4a9d-ae82-ef626bc05ebc sending LeaveGroup request to coordinator 10.56.107.41:9092 (id: 2147483646 rack: null) due to consumer poll timeout has expired. This means the time between subsequent calls to poll() was longer than the configured max.poll.interval.ms, which typically implies that the poll loop is spending too much time processing messages. You can address this either by increasing max.poll.interval.ms or by reducing the maximum size of batches returned in poll() with max.poll.records. 2021-07-26 15:25:52 [DEBUG][ sdlogging] - [Consumer clientId=test_1, groupId=sdlogging] Resetting generation due to consumer pro-actively leaving the group

Maksim-Batsiuk 💬 Автор вопроса
Nikita Ryanov
(просто предположение) По симптомам похоже, что ну...

а разумные размеры это какие? я установил 50_000 и можно ли например 10 мин установить?

Maksim Batsiuk 💬
а разумные размеры это какие? я установил 50_000 и...

Все зависит от приложения. Но имо 10 мин это не оч разумный предел

Maksim-Batsiuk 💬 Автор вопроса

Похожие вопросы

Обсуждают сегодня

Добрый вечер, Пока не совсем понимаю как наладить общение между телеграм ботом и ПО для работы с сим боксом. По самому боту так понял: - Нужен некий баланс, который можно поп...
Magic
6
сделал сайт, прикрутил в боте сайт, и виджет логина. как автоматически логинить пользователя в аккаунт(телеграм), при входе с бота?
Александра Чернивецкая
5
Объясните, пожалуйста, почему компилятор ругается на использование в условии неинициализированной переменной: int x; Task.Run(async () => { x = await somefunc(); }).Wait...
Александр
5
Ребят, подскажите, пожалуйста, почему в префиксе к ассетам, которые генерируются через фильтр | theme в шаблоне, стал вдруг появляться index.php? Вот так выглядит ссылка на а...
Виталий
1
Всем привет. Ребята, подскажите, пожалуйста. у ботов есть ограничение на отправку сообщений - 30 сообщений в секунду, эти ограничения накладываются на все сообщения? или на со...
Artem Stormageddon
4
Блин, ребята, сори за тупые вопросы. А можно ли как-то открыть вебапку по нажатию на кнопку в меню(которое появляется слева, команды)?
Artem Stormageddon
3
а плаксы из-под питона умеют только в комфортных условиях что-то выдавить из себя?)
Lencore
9
Но, может, есть уже проверенная? Наши требования такие: 1. Сообщения должны приходить из Инста в CRM оду 2. Должна быть возможность подключить несколько экаунтов Инстаграм. Р...
Alexander Sharoiko MSE / Александр Шаройко
13
Это может быть все-таки не флудвейт? у меня ботфазер принимает изменения и отображает даже что они изменились, на видео видно что он прислал якобы уже измененное описание, н...
OVERLINK
13
Коллеги, может знает кто, можно ли цвет бейджа счётчика в BackendMenu менять без бубнов?
Alex Blaze
3
Карта сайта