Коллеги, приветствую, есть профессионалы в настройке и эксплуатации брокера очередей

Apache Kafka? есть проблемки которые дока не помогает решить.

Вопрос следующий... я использую кафку как прослойку между 2-мя скриптами. по факту это миграция данных из мускуля в монгу, где первый скрипт забирает из мускуля и кидает в кафку, второй консюмер который читает из кафки обрабатывает данные и засылает в монгу.

так вот проблема собственно с консюмером всегда.
если я делаю достаточно долгую обработку < 5sec, то при попытке сделать consumer.commit() я получаю ошибку о том что группа разбалансирована. и дальнейшая работа этого консюмера не возможно.
благо кафка пишет в ошибку достаточно подробную причину и решение. и сообщает о том что нужно увеличить max_poll_interval_ms, я раскурил что это время которое дается консюмеру на обработку сообщения, и если он выходит за рамки то менеджер группы считает консюмер мертвым. и более не принимает от него запросов. попытался увеличить. но из под консюмера и из под продюсера эти настройки не применяются... через коннект, прилошь кондуктором(прога менеджер кафки) править вручную этот параметр и только тогда он применил. поставил там 15 сек вместо 5. в одном случае это помогло... но бывает так что это не спасает и допустим после 500к записей обработанных начинается беда и отвал постоянный. буквально 5-10 записей и отвал.
пробовал уже убирать автокомит чтобы он 100% всегда ожидал ответа моего консюмера. не убирается. при этом вконце если я коммит не добавлю ручками, он ломается..... бред происходит крч....)
еще есть одна задача. где нужно внутри консюмера бесконечный цикл запустить ожидая подключения клиента. и это может продлится долго. и соответственно кафка отваливается с такой же ошибкой.
Пишу на питоне. использую коннекторы aiokafka и kafka-python, сама кафка поднята одной нодой из сборки wurstmeister/kafka

Консюмер работает в многопотоке. порядка 10-20 консюмеров в группе на один топик.

9 ответов

17 просмотров

Сколько партиций в топике?

Попробуйте лимитировать количество вычитываемых за раз записей max.poll.records чтобы укладываться во время между пулами

Александр- Автор вопроса
Nick
Попробуйте лимитировать количество вычитываемых за...

Были и такие попытки, ок ещё раз потрогаю

попробуйте увелиить значение конфига консюмера max.poll.interval.ms Если между вызовами poll() проходит больше чем max.poll.interval.ms времени, координатор выкидывает консюмер из группы и граппа переходит в имбалансное состояние. Либо же как писали выше - уменьшить max.poll.records, чтобы брать в poll меньше записей и обрабатывать их быстрее

А почему я извиняюсь не взять Kafla connect? Есть коннекторы бесплатные и для MySQL и монги. Зачем велосипед с проблемами если задача решается за два часа?

Александр- Автор вопроса
Vik Gamov
А почему я извиняюсь не взять Kafla connect? Есть ...

Имеете ввиду напрямую? Проблема в том что мне нужно стянуть данные в одном виде, а в монге они агрегированы совсем иначе, структура сильно поменялась в проекте и переезд на монгу идет. И еще я не слышал если честно о коннекторе в рамках подобной задачи, как вариант дублирования или дозалива данных да его можно использовать, но разве для миграции описанной выше, он подойдёт?

Александр
Имеете ввиду напрямую? Проблема в том что мне нужн...

Перекачиваешь данные из MySQLв кафку, агрегируешь ksqlDB, и дальше в монгу коннектором.

Александр- Автор вопроса
Vik Gamov
Перекачиваешь данные из MySQLв кафку, агрегируешь ...

А изменять данные там тоже можно? Типы данных тоже поменялись Данные в чистом виде без изменений вообще не льются, и да, некоторые данные это reference объекты как в этом случае быть?

Похожие вопросы

Обсуждают сегодня

Добрый вечер, Пока не совсем понимаю как наладить общение между телеграм ботом и ПО для работы с сим боксом. По самому боту так понял: - Нужен некий баланс, который можно поп...
Magic
6
сделал сайт, прикрутил в боте сайт, и виджет логина. как автоматически логинить пользователя в аккаунт(телеграм), при входе с бота?
Александра Чернивецкая
5
Объясните, пожалуйста, почему компилятор ругается на использование в условии неинициализированной переменной: int x; Task.Run(async () => { x = await somefunc(); }).Wait...
Александр
5
Ребят, подскажите, пожалуйста, почему в префиксе к ассетам, которые генерируются через фильтр | theme в шаблоне, стал вдруг появляться index.php? Вот так выглядит ссылка на а...
Виталий
1
Всем привет. Ребята, подскажите, пожалуйста. у ботов есть ограничение на отправку сообщений - 30 сообщений в секунду, эти ограничения накладываются на все сообщения? или на со...
Artem Stormageddon
4
Блин, ребята, сори за тупые вопросы. А можно ли как-то открыть вебапку по нажатию на кнопку в меню(которое появляется слева, команды)?
Artem Stormageddon
3
а плаксы из-под питона умеют только в комфортных условиях что-то выдавить из себя?)
Lencore
9
Но, может, есть уже проверенная? Наши требования такие: 1. Сообщения должны приходить из Инста в CRM оду 2. Должна быть возможность подключить несколько экаунтов Инстаграм. Р...
Alexander Sharoiko MSE / Александр Шаройко
13
Это может быть все-таки не флудвейт? у меня ботфазер принимает изменения и отображает даже что они изменились, на видео видно что он прислал якобы уже измененное описание, н...
OVERLINK
13
Коллеги, может знает кто, можно ли цвет бейджа счётчика в BackendMenu менять без бубнов?
Alex Blaze
3
Карта сайта