Добрый день. Получаю сообщения из Kafka и если их долго обрабатываю,

то приложение на коммите зависает.
Вот часть кода:
while (true) {
records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
Логика… // Если тут долгое выполнение, приложение зависает на коммите
}
log.debug("Начинаю коммит");
consumer.commitSync();
log.debug("Закончил коммит");
}
В логах вижу только Начинаю коммит, строчка о том, что коммит завершен не выводится.
Как исправить данную проблему?

5 ответов

33 просмотра

(просто предположение) По симптомам похоже, что нужно увеличить max.poll.interval до разумных пределов (>>> времени обработки) или через pause()/resume() обрабатывать сообщения, делая холостые poll(...). Скорее всего, где-то в логах должны быть сообщения о том, что группа заребалансилась и коммит не может быть сделан

Maksim-Batsiuk 💬 Автор вопроса
Nikita Ryanov
(просто предположение) По симптомам похоже, что ну...

2021-07-26 15:25:52 [DEBUG][1-thread-1] - Начинаю коммит 2021-07-26 15:25:52 [DEBUG][ sdlogging] - [Consumer clientId=test_1, groupId=sdlogging] Sending FindCoordinator request to broker 10.56.107.64:9092 (id: 6 rack: null) 2021-07-26 15:25:52 [DEBUG][ sdlogging] - [Consumer clientId=test_1, groupId=sdlogging] Sending FIND_COORDINATOR request with header RequestHeader(apiKey=FIND_COORDINATOR, apiVersion=3, clientId=test_1, correlationId=27) and timeout 30000 to node 6: {key=sdlogging,key_type=0,_tagged_fields={}} 2021-07-26 15:25:52 [DEBUG][1-thread-1] - [Consumer clientId=test_1, groupId=sdlogging] Received FIND_COORDINATOR response from node 6 for request with header RequestHeader(apiKey=FIND_COORDINATOR, apiVersion=3, clientId=test_1, correlationId=27): FindCoordinatorResponseData(throttleTimeMs=0, errorCode=0, errorMessage='NONE', nodeId=1, host='10.56.107.41', port=9092) 2021-07-26 15:25:52 [DEBUG][1-thread-1] - [Consumer clientId=test_1, groupId=sdlogging] Received FindCoordinator response ClientResponse(receivedTimeMs=1627302352285, latencyMs=2, disconnected=false, requestHeader=RequestHeader(apiKey=FIND_COORDINATOR, apiVersion=3, clientId=test_1, correlationId=27), responseBody=FindCoordinatorResponseData(throttleTimeMs=0, errorCode=0, errorMessage='NONE', nodeId=1, host='10.56.107.41', port=9092)) 2021-07-26 15:25:52 [INFO ][1-thread-1] - [Consumer clientId=test_1, groupId=sdlogging] Discovered group coordinator 10.56.107.41:9092 (id: 2147483646 rack: null) 2021-07-26 15:25:52 [INFO ][1-thread-1] - [Consumer clientId=test_1, groupId=sdlogging] Group coordinator 10.56.107.41:9092 (id: 2147483646 rack: null) is unavailable or invalid, will attempt rediscovery 2021-07-26 15:25:52 [DEBUG][1-thread-1] - [Consumer clientId=test_1, groupId=sdlogging] Sending FindCoordinator request to broker 10.56.107.66:9092 (id: 5 rack: null) 2021-07-26 15:25:52 [DEBUG][1-thread-1] - [Consumer clientId=test_1, groupId=sdlogging] Sending FIND_COORDINATOR request with header RequestHeader(apiKey=FIND_COORDINATOR, apiVersion=3, clientId=test_1, correlationId=28) and timeout 30000 to node 5: {key=sdlogging,key_type=0,_tagged_fields={}} 2021-07-26 15:25:52 [DEBUG][1-thread-1] - [Consumer clientId=test_1, groupId=sdlogging] Received FIND_COORDINATOR response from node 5 for request with header RequestHeader(apiKey=FIND_COORDINATOR, apiVersion=3, clientId=test_1, correlationId=28): FindCoordinatorResponseData(throttleTimeMs=0, errorCode=0, errorMessage='NONE', nodeId=1, host='10.56.107.41', port=9092) 2021-07-26 15:25:52 [DEBUG][1-thread-1] - [Consumer clientId=test_1, groupId=sdlogging] Received FindCoordinator response ClientResponse(receivedTimeMs=1627302352388, latencyMs=2, disconnected=false, requestHeader=RequestHeader(apiKey=FIND_COORDINATOR, apiVersion=3, clientId=test_1, correlationId=28), responseBody=FindCoordinatorResponseData(throttleTimeMs=0, errorCode=0, errorMessage='NONE', nodeId=1, host='10.56.107.41', port=9092)) 2021-07-26 15:25:52 [INFO ][1-thread-1] - [Consumer clientId=test_1, groupId=sdlogging] Discovered group coordinator 10.56.107.41:9092 (id: 2147483646 rack: null) 2021-07-26 15:25:52 [DEBUG][1-thread-1] - [Consumer clientId=test_1, groupId=sdlogging] Initiating connection to node 10.56.107.41:9092 (id: 2147483646 rack: null) using address /10.56.107.41 2021-07-26 15:25:52 [INFO ][ sdlogging] - [Consumer clientId=test_1, groupId=sdlogging] Member test_1-850de298-a8af-4a9d-ae82-ef626bc05ebc sending LeaveGroup request to coordinator 10.56.107.41:9092 (id: 2147483646 rack: null) due to consumer poll timeout has expired. This means the time between subsequent calls to poll() was longer than the configured max.poll.interval.ms, which typically implies that the poll loop is spending too much time processing messages. You can address this either by increasing max.poll.interval.ms or by reducing the maximum size of batches returned in poll() with max.poll.records. 2021-07-26 15:25:52 [DEBUG][ sdlogging] - [Consumer clientId=test_1, groupId=sdlogging] Resetting generation due to consumer pro-actively leaving the group

Maksim-Batsiuk 💬 Автор вопроса
Nikita Ryanov
(просто предположение) По симптомам похоже, что ну...

а разумные размеры это какие? я установил 50_000 и можно ли например 10 мин установить?

Maksim Batsiuk 💬
а разумные размеры это какие? я установил 50_000 и...

Все зависит от приложения. Но имо 10 мин это не оч разумный предел

Maksim-Batsiuk 💬 Автор вопроса

Похожие вопросы

Обсуждают сегодня

Подскажите, а есть vault lite или ченить такое?) А то нужен вольт для похода в вольт, но весит он ~500 мб) как-то многовато для парочки запросов ))
Alexandr Orloff
17
Всем привет, есть небольшая проблема Есть такой скрипт document.addEventListener('DOMContentLoaded', function () { const sliderTabs = document.querySelectorAll('.s...
A da
8
@go1337 @dblackCat Привет. Все ещё дрочусь с fastpanel. Добавил второй домен который должен смотреть в рут того же сайта, но так как это просто домен, а не сайт, я не могу ему...
Ross 🦴
9
До речі, в ево нема можливості чи якого розширення щоб з адмінки з телефона зайти і терміново щось в верстці поправити?
Женя
7
кто-нибудь пользуется тайм-трекерами во время работы? так много разных нагуглил, может есть что-то популярное
Lencore
8
Пацаны. Я разрабатываю софт для инвайтинга на телетон, и столкнулся с такой проблемой, в один из чатов не могу приглашать никого, не дает добавлять, в то же время через официа...
Kernel Panic
11
Скажите, а кому нужен Currency как отдельный плагин вместо полноценного ecommerce в OctoberCMS? Кто-то использует его уже или планирует в будущем? Может я что-то не понимаю?
Igor
13
Розмовами про Рево мені нагадали часи, коли шаблони правилися прямо в адмінці. Хто в курсі, чому відійшли від цієї практики, так блейд не працює? Доволі зручно ж було (інколи)
Женя
3
Всем добрый вечер, Рад оказаться в кругу единомышленников. Начинаю погружаться в мир .net веба. Зовут Ерасыл 🖖 У меня назрел вопрос: Какой процент проектов, прошедшие через в...
Ерасыл
6
Чому? Да тому що без GiT не уявляю нормального проекта а коли код в базі то то так собі
Dmytro Lukianenko
3
Карта сайта