делаю сейчас на своих кластерах и он чушь какую-то переливает.
в чем суть чуши?
Никакие оффсеты не совпадают. Конфиг вроде беру с документации. Может есть какие-то подводные камни?
убогое решение, берите спарк либу и учите приложение писать в 2 кластера
https://kafka.apache.org/documentation/#georeplication-flow-configure Судя по доке - да.
если у вас нет всей истории с нулевого оффсета по всем топикам, то оффсеты не сохранятся
даже если есть, это всё равно плохое решение, пусть учат приложение работать с 2 кластерами сразу
там ни одной строчки про оффсеты нет, может вы спутали с сохранением порядка, что не равно сохранению номеров оффсетов
sync.group.offsets.enabled: whether to periodically write the translated offsets of replicated consumer groups (in the source cluster) to __consumer_offsets topic in target cluster, as long as no active consumers in that group are connected to the target cluster (default: false) А как это тогда понять? Я думал что это параметр включает трансляцию оффсетов.
берёте спарк либу для кафки, обнуляете оба кластера, и пишете через либу в 2 кластера, с сохранением индексов офсетов в бд(сцилла\рокс)
ну плохое это ж относительно, зато быстро и чуток помучавшись можно заставить работать в режиме лишь бы работало. А потом уже заняться реализацией того чего реально надо
это про ситуацию, когда опять же вы мирор настроили на переливку всех данных с начала времен, т.е. с нулевого оффсета и синкаются закоммиченные оффсеты консумеров, а не оффсеты записей. Как любит говорить Хрыч - дока у кафки говно) Тут короч надо между строк много чего читать
Ну у меня так и есть в принципе. Пытаюсь все перелить и переключить пользаков чтобы они не читали тоже все два раза.
тем более толоько вариант с запиисью в 2 кластера с сохранением индекса офсетов в бд
а вам действительно нужен второй кластер не размазанный один большой?
зукипер в 7 хостов? рафт пока вообще от задержк дохнет
У меня есть 2 геораспределенных кластера. Новый и старый. Вот хотел все из старого мигрировать в новый без каких либо отклонений.
это не про кафкук
от с этого и надо было начинать) сразу ясно что за проблема и почему вы пытаетесь решить ее криво
И почему же криво?
Хрыч вышел написал как действовать, сохранение оффсетов не нужно, новые ид группы выдаете и летите дальше на новой кафке
А что делать если нужны данные из топика перенести? По факту там есть топик с очень длинным хранением данных. Перенести можно, только вот опять вопрос с оффсетами.
"топик с очень длинным хранением данных" - вот и кривость подъехала, оно подразумевалось но теперь хотя бы это в открытую сказано. Процесс тот же - ставим кафку новую, настраиваем консумеры из нее, включаем переливку. Обрабатываем все данные по новой. Если по какойто несуразности у вас не идемпотентная обработка на перечитывании, то вам поможет только ручное разгребание с участием базы и ручным сопоставлением оффсетов.
А в чем кривость?
ну вы же не смогли взять стандартную тулзу и решить свою задачу - значит чтото криво. Ну а вообще это специфический антипаттерн при работе с кафкой с хранением всей истории и с ним надо опять же специфически работать раз уж вы обоснованно его выбрали и понимаете его минусы (в которые как раз вляпались) - а раз вляпались, то либо такой паттерн выбран бездумно, либо не проработали, либо реализован частично, либо не хватает квалификации понять что все это надо было проработать. Возможно просто люди ушли которые это все проектировали, а после себя ничего не оставили - тут конечно "криво" не подходит, тут уже управленческий проеб
у меня есть топики которые храняться по пол года. но мы реализовывали потому внешние индексы, пишем сразу в 2 кафки, и приложение ориентируется на номер офсета в бд а не кафке
сдается мне что тут такой подход обдуман и не просто так применен) ну и вокруг обвязочка тоже чувствуется неспроста появилась
так потому что зачастую релиз стриминга с багом и потом надо всё перечитывать и переделывать батч
На проде мм2 ещё не запускал, но тесты гонял. По доке: у мм2 именно трансляция оффсетов, а не копирование. Т.е. На кластереА у тебя приходят сообщения 101,102,103. На кластерБ они могут быть среплицированы как 11,12,14 (дубль, например, пришел между или кто записал откуда-то). ММ2 скидывает маппинг консьюмер групп, т.е. 101->11, 102->12, 103->14. Время от времени на кластереБ происходит применение этих записей к консьюмер группам на кластереБ. Если у тебя клиенты опираются на консьюмер группа только - должно работать. Если оффсеты хранятся во внешнем хранилище - ММ2 (по крайней мере без кастомного кода не поможет). Слышал, что есть пара аналогов и вроде у конфлюента репликатор копирует 1в1 за счёт копирования в кластерБ как в реплику, но сам не пробовал
да там очередная попытка переехать с кластера на кластер без "работы от разработчиков"
Конфиг можешь скинуть если не жалко? А то такое ощущение что я криво просто настраиваю у себя
Я из KIP и апачевской доки собирал. Попозже постараюсь скинуть
В заметках у меня такой конфиг остался (это в одну сторону, с префиксом топиков): ### DEFAULT PROPERTIES: replication.factor=3 replication.policy.separator=___ ### CONNECTORS: clusterA->clusterB.enabled=True clusterA->clusterB.sync.group.offsets.enabled=True clusterA->clusterB.sync.topic.configs.enabled=True clusterA->clusterB.emit.checkpoints.enabled=True clusterA->clusterB.topics=.* clusterA->clusterB.groups=.* Ещё нашёл заметки по такому списку конфигов (больше всяких), плюс здесь политика без добавления префикса топиов. По каждому конфигу можно погуглить что он значит. В КИПах описание некоторых было, по-моему ещё в доках стримзи неплохое описание было. clusterA->clusterB.enabled: true clusterA->clusterB.topics: test-topic clusterA->clusterB.replication.policy.class: org.apache.kafka.connect.mirror.IdentityReplicationPolicy clusterA->clusterB.refresh.topics.enabled: true clusterA->clusterB.sync.group.offsets.enabled: true clusterA->clusterB.sync.topic.configs.enabled: true clusterA->clusterB.refresh.groups.enabled: true clusterA->clusterB.emit.heartbeats.enabled: true clusterA->clusterB.sync.topic.acls.enabled: true clusterA->clusterB.emit.checkpoints.enabled: true
Сергей, у тебя получалось настроить копирование оффсетов? У меня идет страшный и неустранимый во времени рассинхрон, и я не могу найти причину. Или это действительно гиблая затея?
На всякий случай: что ты имеешь ввиду под оффсетами? Мм2 копирует сообщения и консьюмер группы с новыми, соответствующими сообщениям, оффсетами. Т.е. оффсеты будут новые: сообщение Х на старом кластере имело оффсет 100, а на новом это будет, например, оффсет 33. Да, копировать сообщения и консьюмер группы получалось, для этого там используется чуть меньше десятка внутренних топиков (смотрите логи авторизации или мм2, если запрещено создание топиков автоматически, как это должно быть)
У меня получается вот так: 0. Есть два кластера А и Б. Б - резервный (требование заказчика). На Б происходит перекючение, когда в основном контуре что-то сильно идет не так. 1. В кластер А мы пишем транзакционно, т.о. оффсеты сдвигаюся еще и на служебные транзакционные сообщения. 2. Кластер Б копирует оффсеты "отставая" по количеству от А как раз на число этих служебных сообщений. Т.е. в итоге часть сообщений из А остается необработанными при переходе рабты в Б. И вот кто тут неправ - непонятно. Возможно я что-то делаю неверно, но понять бы - что.
Так вам важны консьюмер группы или оффсеты сообщений (если вы эти оффсеты храните во внешнем хранилище и используете их для каких-то целей)? В случае консьюмер групп - оно настраивается. В случае именно использования оффсетов - они точно будут различаться и мм2 здесь не поможет
Наверное одно без другого в случае репликации оффсетов невозможно. Надеюсь только на MM2. Но вот что-то идет не так
Перечитайте это сообщение ещё раз о том, как работает мм2 и как он реплицирует консьюмер группы. Если этого не хватает, почитайте ещё раз документацию о том, что гарантирует мм2, а что нет. По точное соответствие оффсетов там ничего нет, только консьюмер группы с другими оффсетами, соответствующими тем же сообщениям на новом кластере
Обсуждают сегодня