то приложение на коммите зависает.
Вот часть кода:
while (true) {
records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
Логика… // Если тут долгое выполнение, приложение зависает на коммите
}
log.debug("Начинаю коммит");
consumer.commitSync();
log.debug("Закончил коммит");
}
В логах вижу только Начинаю коммит, строчка о том, что коммит завершен не выводится.
Как исправить данную проблему?
(просто предположение) По симптомам похоже, что нужно увеличить max.poll.interval до разумных пределов (>>> времени обработки) или через pause()/resume() обрабатывать сообщения, делая холостые poll(...). Скорее всего, где-то в логах должны быть сообщения о том, что группа заребалансилась и коммит не может быть сделан
2021-07-26 15:25:52 [DEBUG][1-thread-1] - Начинаю коммит 2021-07-26 15:25:52 [DEBUG][ sdlogging] - [Consumer clientId=test_1, groupId=sdlogging] Sending FindCoordinator request to broker 10.56.107.64:9092 (id: 6 rack: null) 2021-07-26 15:25:52 [DEBUG][ sdlogging] - [Consumer clientId=test_1, groupId=sdlogging] Sending FIND_COORDINATOR request with header RequestHeader(apiKey=FIND_COORDINATOR, apiVersion=3, clientId=test_1, correlationId=27) and timeout 30000 to node 6: {key=sdlogging,key_type=0,_tagged_fields={}} 2021-07-26 15:25:52 [DEBUG][1-thread-1] - [Consumer clientId=test_1, groupId=sdlogging] Received FIND_COORDINATOR response from node 6 for request with header RequestHeader(apiKey=FIND_COORDINATOR, apiVersion=3, clientId=test_1, correlationId=27): FindCoordinatorResponseData(throttleTimeMs=0, errorCode=0, errorMessage='NONE', nodeId=1, host='10.56.107.41', port=9092) 2021-07-26 15:25:52 [DEBUG][1-thread-1] - [Consumer clientId=test_1, groupId=sdlogging] Received FindCoordinator response ClientResponse(receivedTimeMs=1627302352285, latencyMs=2, disconnected=false, requestHeader=RequestHeader(apiKey=FIND_COORDINATOR, apiVersion=3, clientId=test_1, correlationId=27), responseBody=FindCoordinatorResponseData(throttleTimeMs=0, errorCode=0, errorMessage='NONE', nodeId=1, host='10.56.107.41', port=9092)) 2021-07-26 15:25:52 [INFO ][1-thread-1] - [Consumer clientId=test_1, groupId=sdlogging] Discovered group coordinator 10.56.107.41:9092 (id: 2147483646 rack: null) 2021-07-26 15:25:52 [INFO ][1-thread-1] - [Consumer clientId=test_1, groupId=sdlogging] Group coordinator 10.56.107.41:9092 (id: 2147483646 rack: null) is unavailable or invalid, will attempt rediscovery 2021-07-26 15:25:52 [DEBUG][1-thread-1] - [Consumer clientId=test_1, groupId=sdlogging] Sending FindCoordinator request to broker 10.56.107.66:9092 (id: 5 rack: null) 2021-07-26 15:25:52 [DEBUG][1-thread-1] - [Consumer clientId=test_1, groupId=sdlogging] Sending FIND_COORDINATOR request with header RequestHeader(apiKey=FIND_COORDINATOR, apiVersion=3, clientId=test_1, correlationId=28) and timeout 30000 to node 5: {key=sdlogging,key_type=0,_tagged_fields={}} 2021-07-26 15:25:52 [DEBUG][1-thread-1] - [Consumer clientId=test_1, groupId=sdlogging] Received FIND_COORDINATOR response from node 5 for request with header RequestHeader(apiKey=FIND_COORDINATOR, apiVersion=3, clientId=test_1, correlationId=28): FindCoordinatorResponseData(throttleTimeMs=0, errorCode=0, errorMessage='NONE', nodeId=1, host='10.56.107.41', port=9092) 2021-07-26 15:25:52 [DEBUG][1-thread-1] - [Consumer clientId=test_1, groupId=sdlogging] Received FindCoordinator response ClientResponse(receivedTimeMs=1627302352388, latencyMs=2, disconnected=false, requestHeader=RequestHeader(apiKey=FIND_COORDINATOR, apiVersion=3, clientId=test_1, correlationId=28), responseBody=FindCoordinatorResponseData(throttleTimeMs=0, errorCode=0, errorMessage='NONE', nodeId=1, host='10.56.107.41', port=9092)) 2021-07-26 15:25:52 [INFO ][1-thread-1] - [Consumer clientId=test_1, groupId=sdlogging] Discovered group coordinator 10.56.107.41:9092 (id: 2147483646 rack: null) 2021-07-26 15:25:52 [DEBUG][1-thread-1] - [Consumer clientId=test_1, groupId=sdlogging] Initiating connection to node 10.56.107.41:9092 (id: 2147483646 rack: null) using address /10.56.107.41 2021-07-26 15:25:52 [INFO ][ sdlogging] - [Consumer clientId=test_1, groupId=sdlogging] Member test_1-850de298-a8af-4a9d-ae82-ef626bc05ebc sending LeaveGroup request to coordinator 10.56.107.41:9092 (id: 2147483646 rack: null) due to consumer poll timeout has expired. This means the time between subsequent calls to poll() was longer than the configured max.poll.interval.ms, which typically implies that the poll loop is spending too much time processing messages. You can address this either by increasing max.poll.interval.ms or by reducing the maximum size of batches returned in poll() with max.poll.records. 2021-07-26 15:25:52 [DEBUG][ sdlogging] - [Consumer clientId=test_1, groupId=sdlogging] Resetting generation due to consumer pro-actively leaving the group
а разумные размеры это какие? я установил 50_000 и можно ли например 10 мин установить?
Все зависит от приложения. Но имо 10 мин это не оч разумный предел
понял, значит оставлю свое значение
Обсуждают сегодня