Коллеги, приветствую, есть профессионалы в настройке и эксплуатации брокера очередей

Apache Kafka? есть проблемки которые дока не помогает решить.

Вопрос следующий... я использую кафку как прослойку между 2-мя скриптами. по факту это миграция данных из мускуля в монгу, где первый скрипт забирает из мускуля и кидает в кафку, второй консюмер который читает из кафки обрабатывает данные и засылает в монгу.

так вот проблема собственно с консюмером всегда.
если я делаю достаточно долгую обработку < 5sec, то при попытке сделать consumer.commit() я получаю ошибку о том что группа разбалансирована. и дальнейшая работа этого консюмера не возможно.
благо кафка пишет в ошибку достаточно подробную причину и решение. и сообщает о том что нужно увеличить max_poll_interval_ms, я раскурил что это время которое дается консюмеру на обработку сообщения, и если он выходит за рамки то менеджер группы считает консюмер мертвым. и более не принимает от него запросов. попытался увеличить. но из под консюмера и из под продюсера эти настройки не применяются... через коннект, прилошь кондуктором(прога менеджер кафки) править вручную этот параметр и только тогда он применил. поставил там 15 сек вместо 5. в одном случае это помогло... но бывает так что это не спасает и допустим после 500к записей обработанных начинается беда и отвал постоянный. буквально 5-10 записей и отвал.
пробовал уже убирать автокомит чтобы он 100% всегда ожидал ответа моего консюмера. не убирается. при этом вконце если я коммит не добавлю ручками, он ломается..... бред происходит крч....)
еще есть одна задача. где нужно внутри консюмера бесконечный цикл запустить ожидая подключения клиента. и это может продлится долго. и соответственно кафка отваливается с такой же ошибкой.
Пишу на питоне. использую коннекторы aiokafka и kafka-python, сама кафка поднята одной нодой из сборки wurstmeister/kafka

Консюмер работает в многопотоке. порядка 10-20 консюмеров в группе на один топик.

9 ответов

13 просмотров

Сколько партиций в топике?

Попробуйте лимитировать количество вычитываемых за раз записей max.poll.records чтобы укладываться во время между пулами

Александр- Автор вопроса
Nick
Попробуйте лимитировать количество вычитываемых за...

Были и такие попытки, ок ещё раз потрогаю

попробуйте увелиить значение конфига консюмера max.poll.interval.ms Если между вызовами poll() проходит больше чем max.poll.interval.ms времени, координатор выкидывает консюмер из группы и граппа переходит в имбалансное состояние. Либо же как писали выше - уменьшить max.poll.records, чтобы брать в poll меньше записей и обрабатывать их быстрее

А почему я извиняюсь не взять Kafla connect? Есть коннекторы бесплатные и для MySQL и монги. Зачем велосипед с проблемами если задача решается за два часа?

Александр- Автор вопроса
Vik Gamov
А почему я извиняюсь не взять Kafla connect? Есть ...

Имеете ввиду напрямую? Проблема в том что мне нужно стянуть данные в одном виде, а в монге они агрегированы совсем иначе, структура сильно поменялась в проекте и переезд на монгу идет. И еще я не слышал если честно о коннекторе в рамках подобной задачи, как вариант дублирования или дозалива данных да его можно использовать, но разве для миграции описанной выше, он подойдёт?

Александр
Имеете ввиду напрямую? Проблема в том что мне нужн...

Перекачиваешь данные из MySQLв кафку, агрегируешь ksqlDB, и дальше в монгу коннектором.

Александр- Автор вопроса
Vik Gamov
Перекачиваешь данные из MySQLв кафку, агрегируешь ...

А изменять данные там тоже можно? Типы данных тоже поменялись Данные в чистом виде без изменений вообще не льются, и да, некоторые данные это reference объекты как в этом случае быть?

Похожие вопросы

Обсуждают сегодня

Ребята, всем привет. Подскажите, пожалуйста, можно ли как-то через бота понять, что этого бота добавили в группу\канал и выдали ему права администратора?
Artem Stormageddon
9
Это переведённый текст с английского. Я не говорю на русском, но могу использовать переводчик Телеграм. Приветствую! Я начинающий веб-разработчик и все еще учусь. В настояще...
𐩱𐩪𐩣𐩱𐩲𐩺𐩡
2
А не хотим ли мы развлечься? 😉 Но так чтобы с пользой для наших профессиональных навыков?? 👨‍🎓👩‍🎓 Предлагаю на октябрь запланировать тестовый запуск новой командной игры "Игр...
Andrii Kurdiumov
2
Привет всем! Почему этот код не срабатывает при добавлении или удалении пользователя из чата? bot.on('chat_member', async (ctx) => { console.log(ctx); }) bot.launch({allo...
Alexander
5
у кого сколько оперативы на базе данных ?
АДИЛЬБЕК
4
Через бот апи возможно получить ID стикерпака? Не ссылку.
Vexylon [АФК до 09.09]
5
Привет Хочу сделать аналог iCloud’а для своих проектов, чтобы пользовательская информация хранилась в облаке, была доступна во всех сервисах, её можно было подсасывать везде)...
Виталий
9
В тг можно спарсить всех кто пишет в группе? Если список участников скрыт
S
3
код Event::listen('cms.page.display', function (&$content, $slug, $page, $html) { if (is_object($content)) { dump($content); } else { dump($s...
Point 111
3
Ребят, а двух-факторку для плагина Users и для бэкенда октября кто-то прикручивал? Поделитесь опытом
Constantine Anikin
4
Карта сайта