Добрый день. Получаю сообщения из Kafka и если их долго обрабатываю,

то приложение на коммите зависает.
Вот часть кода:
while (true) {
records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
Логика… // Если тут долгое выполнение, приложение зависает на коммите
}
log.debug("Начинаю коммит");
consumer.commitSync();
log.debug("Закончил коммит");
}
В логах вижу только Начинаю коммит, строчка о том, что коммит завершен не выводится.
Как исправить данную проблему?

5 ответов

45 просмотров

(просто предположение) По симптомам похоже, что нужно увеличить max.poll.interval до разумных пределов (>>> времени обработки) или через pause()/resume() обрабатывать сообщения, делая холостые poll(...). Скорее всего, где-то в логах должны быть сообщения о том, что группа заребалансилась и коммит не может быть сделан

Maksim-Batsiuk 💬 Автор вопроса
Nikita Ryanov
(просто предположение) По симптомам похоже, что ну...

2021-07-26 15:25:52 [DEBUG][1-thread-1] - Начинаю коммит 2021-07-26 15:25:52 [DEBUG][ sdlogging] - [Consumer clientId=test_1, groupId=sdlogging] Sending FindCoordinator request to broker 10.56.107.64:9092 (id: 6 rack: null) 2021-07-26 15:25:52 [DEBUG][ sdlogging] - [Consumer clientId=test_1, groupId=sdlogging] Sending FIND_COORDINATOR request with header RequestHeader(apiKey=FIND_COORDINATOR, apiVersion=3, clientId=test_1, correlationId=27) and timeout 30000 to node 6: {key=sdlogging,key_type=0,_tagged_fields={}} 2021-07-26 15:25:52 [DEBUG][1-thread-1] - [Consumer clientId=test_1, groupId=sdlogging] Received FIND_COORDINATOR response from node 6 for request with header RequestHeader(apiKey=FIND_COORDINATOR, apiVersion=3, clientId=test_1, correlationId=27): FindCoordinatorResponseData(throttleTimeMs=0, errorCode=0, errorMessage='NONE', nodeId=1, host='10.56.107.41', port=9092) 2021-07-26 15:25:52 [DEBUG][1-thread-1] - [Consumer clientId=test_1, groupId=sdlogging] Received FindCoordinator response ClientResponse(receivedTimeMs=1627302352285, latencyMs=2, disconnected=false, requestHeader=RequestHeader(apiKey=FIND_COORDINATOR, apiVersion=3, clientId=test_1, correlationId=27), responseBody=FindCoordinatorResponseData(throttleTimeMs=0, errorCode=0, errorMessage='NONE', nodeId=1, host='10.56.107.41', port=9092)) 2021-07-26 15:25:52 [INFO ][1-thread-1] - [Consumer clientId=test_1, groupId=sdlogging] Discovered group coordinator 10.56.107.41:9092 (id: 2147483646 rack: null) 2021-07-26 15:25:52 [INFO ][1-thread-1] - [Consumer clientId=test_1, groupId=sdlogging] Group coordinator 10.56.107.41:9092 (id: 2147483646 rack: null) is unavailable or invalid, will attempt rediscovery 2021-07-26 15:25:52 [DEBUG][1-thread-1] - [Consumer clientId=test_1, groupId=sdlogging] Sending FindCoordinator request to broker 10.56.107.66:9092 (id: 5 rack: null) 2021-07-26 15:25:52 [DEBUG][1-thread-1] - [Consumer clientId=test_1, groupId=sdlogging] Sending FIND_COORDINATOR request with header RequestHeader(apiKey=FIND_COORDINATOR, apiVersion=3, clientId=test_1, correlationId=28) and timeout 30000 to node 5: {key=sdlogging,key_type=0,_tagged_fields={}} 2021-07-26 15:25:52 [DEBUG][1-thread-1] - [Consumer clientId=test_1, groupId=sdlogging] Received FIND_COORDINATOR response from node 5 for request with header RequestHeader(apiKey=FIND_COORDINATOR, apiVersion=3, clientId=test_1, correlationId=28): FindCoordinatorResponseData(throttleTimeMs=0, errorCode=0, errorMessage='NONE', nodeId=1, host='10.56.107.41', port=9092) 2021-07-26 15:25:52 [DEBUG][1-thread-1] - [Consumer clientId=test_1, groupId=sdlogging] Received FindCoordinator response ClientResponse(receivedTimeMs=1627302352388, latencyMs=2, disconnected=false, requestHeader=RequestHeader(apiKey=FIND_COORDINATOR, apiVersion=3, clientId=test_1, correlationId=28), responseBody=FindCoordinatorResponseData(throttleTimeMs=0, errorCode=0, errorMessage='NONE', nodeId=1, host='10.56.107.41', port=9092)) 2021-07-26 15:25:52 [INFO ][1-thread-1] - [Consumer clientId=test_1, groupId=sdlogging] Discovered group coordinator 10.56.107.41:9092 (id: 2147483646 rack: null) 2021-07-26 15:25:52 [DEBUG][1-thread-1] - [Consumer clientId=test_1, groupId=sdlogging] Initiating connection to node 10.56.107.41:9092 (id: 2147483646 rack: null) using address /10.56.107.41 2021-07-26 15:25:52 [INFO ][ sdlogging] - [Consumer clientId=test_1, groupId=sdlogging] Member test_1-850de298-a8af-4a9d-ae82-ef626bc05ebc sending LeaveGroup request to coordinator 10.56.107.41:9092 (id: 2147483646 rack: null) due to consumer poll timeout has expired. This means the time between subsequent calls to poll() was longer than the configured max.poll.interval.ms, which typically implies that the poll loop is spending too much time processing messages. You can address this either by increasing max.poll.interval.ms or by reducing the maximum size of batches returned in poll() with max.poll.records. 2021-07-26 15:25:52 [DEBUG][ sdlogging] - [Consumer clientId=test_1, groupId=sdlogging] Resetting generation due to consumer pro-actively leaving the group

Maksim-Batsiuk 💬 Автор вопроса
Nikita Ryanov
(просто предположение) По симптомам похоже, что ну...

а разумные размеры это какие? я установил 50_000 и можно ли например 10 мин установить?

Maksim Batsiuk 💬
а разумные размеры это какие? я установил 50_000 и...

Все зависит от приложения. Но имо 10 мин это не оч разумный предел

Maksim-Batsiuk 💬 Автор вопроса

Похожие вопросы

Обсуждают сегодня

Добрый вечер, Пока не совсем понимаю как наладить общение между телеграм ботом и ПО для работы с сим боксом. По самому боту так понял: - Нужен некий баланс, который можно поп...
Magic
6
Объясните, пожалуйста, почему компилятор ругается на использование в условии неинициализированной переменной: int x; Task.Run(async () => { x = await somefunc(); }).Wait...
Александр
5
Всем привет. Ребята, подскажите, пожалуйста. у ботов есть ограничение на отправку сообщений - 30 сообщений в секунду, эти ограничения накладываются на все сообщения? или на со...
Artem Stormageddon
4
Блин, ребята, сори за тупые вопросы. А можно ли как-то открыть вебапку по нажатию на кнопку в меню(которое появляется слева, команды)?
Artem Stormageddon
3
Коллеги, может знает кто, можно ли цвет бейджа счётчика в BackendMenu менять без бубнов?
Alex Blaze
3
а плаксы из-под питона умеют только в комфортных условиях что-то выдавить из себя?)
Lencore
9
Привет!) Кто как юзает переменные в строках?) Чисто ради интереса Вот так: echo "У меня {$bananasAmount} бананов"; Или вот так: echo "У меня ${bananasAmount} бананов";
Виталий
3
Но, может, есть уже проверенная? Наши требования такие: 1. Сообщения должны приходить из Инста в CRM оду 2. Должна быть возможность подключить несколько экаунтов Инстаграм. Р...
Alexander Sharoiko MSE / Александр Шаройко
13
разработчик ботов скидывает портфолио, боты которые он уже создал. А вот как узнать что это именно он их создал?
Gosudar
4
Это может быть все-таки не флудвейт? у меня ботфазер принимает изменения и отображает даже что они изменились, на видео видно что он прислал якобы уже измененное описание, н...
OVERLINK
13
Карта сайта