Всем привет! У меня есть kafka streams приложение. Оно конзюмит с

одного топика input-topic. Делает некоторкую логику, аггрегирует и продюсит аггрегированные данные в разные топики в зависимости от некоторых условий. Для примера output-topic-1, output-topic-2, output-topic-3, и т.д.

Все топики имеют следующее ReplicationFactor:3 и PartitionCount:32

Я создал 5 инстансов своего kafka streams приложения.

32 partitions должны быть распределены между этими 5 инстансами. Все работало хорошо. Но недели через 2-3 оно ушло в loop как будто. Инстансы перестали конзюмить.Все время как будто проходил rebalance&

Было очень сьранным то, что тут не было kafka lags.

Я проверил инфо о моей group id используя kafka-consumer-groups команду. Я прикрепил ответ в следующем сообщении. Там видно что только один инстанс обслуживает все 32 партиции.

Так же тут логи с инстанса который обслуживает эти партиции. Он как будто все время делает reset оффсета и все. Поэтому нет kafka lags.

{"time":"2019-04-29T11:26:16:285","level":"INFO","thread":"kafka-streams-logs-aggregators-a699800a-5bd9-4b20-806f-22bf350504c4-StreamThread-1","logger":"o.a.k.c.consumer.internals.Fetcher","message":"[Consumer clientId=kafka-streams-logs-aggregators-a699800a-5bd9-4b20-806f-22bf350504c4-StreamThread-1-consumer, groupId=kafka-streams-logs-aggregators] Resetting offset for partition input-topic-20 to offset 104184.","app":"kafka-streams-logs-aggregators","env":"dev","version":"240"}

{"time":"2019-04-29T11:26:32:800","level":"INFO","thread":"kafka-streams-logs-aggregators-8062ed33-056d-40d5-a26b-2c271d690d67-StreamThread-1","logger":"o.a.k.s.p.i.StoreChangelogReader","message":"stream-thread [kafka-streams-logs-aggregators-8062ed33-056d-40d5-a26b-2c271d690d67-StreamThread-1] Restoring task 0_10's state store KSTREAM-AGGREGATE-STATE-STORE-0000000029 from beginning of the changelog kafka-streams-logs-aggregators-KSTREAM-AGGREGATE-STATE-STORE-0000000029-changelog-10 ","app":"kafka-streams-logs-aggregators","env":"dev","version":"240"}
Кто-нибудь сталкивался с такой проблемой работая с kafka streams? Надеюсь описал проблему подробно. Дайте знать плз, если нужна еще инфа.

1 ответов

4 просмотра

Конфигурация приложения приложи

Похожие вопросы

Обсуждают сегодня

Но, может, есть уже проверенная? Наши требования такие: 1. Сообщения должны приходить из Инста в CRM оду 2. Должна быть возможность подключить несколько экаунтов Инстаграм. Р...
Alexander Sharoiko MSE / Александр Шаройко
8
Это может быть все-таки не флудвейт? у меня ботфазер принимает изменения и отображает даже что они изменились, на видео видно что он прислал якобы уже измененное описание, н...
OVERLINK
13
Я правильно понимаю что нет способов получить список ожидающих заявок на вступление в группу с помощью бота из mtproto?
Шамиль Прилов
7
Добрый день. Мне посоветовали обратиться к вам в чат за помощью. Ситуация описана на скрине. Как мне сказали, мне на бота навесили флудвейт. Есть ли возможность снять его ра...
OVERLINK
7
всем привет помогите пожалуйста используя CDN (GCP) у игроков из вьетнама загружается конфиг (размер 999 bytes) загружается 5 и более минут н а других CDN сервисах такой пробл...
Andrew Krw.
1
Просто по очереди выпиливаешь на ручной маппинг? По методу за раз
Andrii Kurdiumov
7
Приветствую. А не подскажете какие ограничения есть на использования api метода setMyName ? Несколько раз сменил имя бота и получил бан на 2 месяца на смену имени.
Slick Slack
8
Привет, коллеги! Возникла задача ограничить максимальный размер вложений для определённых расширений, например, чтобы для изображений лимит был 10 МБ, а для видео — 100 МБ. Ог...
Andro
1
Всем привет! Взялся портировать модули на 18 версию, лезет _logger.log(log_level, 'no translation language detected, skipping translation %s', frame, stack_info=True) А чт...
Max Lit
3
Доброе утро, а кто то делал Google аналитику через php ? curl_setopt($ch, CURLOPT_NOSIGNAL, true); Это должно быть async без ожидания ответа. Вообще php нормально с таким с...
Max Dubovsky
9
Карта сайта