Apache Kafka? есть проблемки которые дока не помогает решить.
Вопрос следующий... я использую кафку как прослойку между 2-мя скриптами. по факту это миграция данных из мускуля в монгу, где первый скрипт забирает из мускуля и кидает в кафку, второй консюмер который читает из кафки обрабатывает данные и засылает в монгу.
так вот проблема собственно с консюмером всегда.
если я делаю достаточно долгую обработку < 5sec, то при попытке сделать consumer.commit() я получаю ошибку о том что группа разбалансирована. и дальнейшая работа этого консюмера не возможно.
благо кафка пишет в ошибку достаточно подробную причину и решение. и сообщает о том что нужно увеличить max_poll_interval_ms, я раскурил что это время которое дается консюмеру на обработку сообщения, и если он выходит за рамки то менеджер группы считает консюмер мертвым. и более не принимает от него запросов. попытался увеличить. но из под консюмера и из под продюсера эти настройки не применяются... через коннект, прилошь кондуктором(прога менеджер кафки) править вручную этот параметр и только тогда он применил. поставил там 15 сек вместо 5. в одном случае это помогло... но бывает так что это не спасает и допустим после 500к записей обработанных начинается беда и отвал постоянный. буквально 5-10 записей и отвал.
пробовал уже убирать автокомит чтобы он 100% всегда ожидал ответа моего консюмера. не убирается. при этом вконце если я коммит не добавлю ручками, он ломается..... бред происходит крч....)
еще есть одна задача. где нужно внутри консюмера бесконечный цикл запустить ожидая подключения клиента. и это может продлится долго. и соответственно кафка отваливается с такой же ошибкой.
Пишу на питоне. использую коннекторы aiokafka и kafka-python, сама кафка поднята одной нодой из сборки wurstmeister/kafka
Консюмер работает в многопотоке. порядка 10-20 консюмеров в группе на один топик.
Сколько партиций в топике?
Попробуйте лимитировать количество вычитываемых за раз записей max.poll.records чтобы укладываться во время между пулами
Были и такие попытки, ок ещё раз потрогаю
попробуйте увелиить значение конфига консюмера max.poll.interval.ms Если между вызовами poll() проходит больше чем max.poll.interval.ms времени, координатор выкидывает консюмер из группы и граппа переходит в имбалансное состояние. Либо же как писали выше - уменьшить max.poll.records, чтобы брать в poll меньше записей и обрабатывать их быстрее
А почему я извиняюсь не взять Kafla connect? Есть коннекторы бесплатные и для MySQL и монги. Зачем велосипед с проблемами если задача решается за два часа?
Имеете ввиду напрямую? Проблема в том что мне нужно стянуть данные в одном виде, а в монге они агрегированы совсем иначе, структура сильно поменялась в проекте и переезд на монгу идет. И еще я не слышал если честно о коннекторе в рамках подобной задачи, как вариант дублирования или дозалива данных да его можно использовать, но разве для миграции описанной выше, он подойдёт?
Перекачиваешь данные из MySQLв кафку, агрегируешь ksqlDB, и дальше в монгу коннектором.
А изменять данные там тоже можно? Типы данных тоже поменялись Данные в чистом виде без изменений вообще не льются, и да, некоторые данные это reference объекты как в этом случае быть?
Обсуждают сегодня