таблицы с KafkaEngine
сейчас только идёт пробные запуск
обнаружил, что:
- count() для этой таблицы возвращает скажем 70М записей
- а count(distinct уникальное_guid_поле) возвращает 52M
т.е. похоже данные прилично дублируются (?). Причем именно в СН, поскольку суммарный offset по топику примерно в районе 52М
какому значению верить? как строить аггрегирующие запросы, чтобы дублирование не портило результаты?
argMax, limit by, final Подчекнуть нужное. ReplacingMT и не гарантирует дедупликацию.
https://kb.altinity.com/engines/mergetree-table-engine-family/replacingmergetree/
если у меня таблица: guid, app_id, event_id, weight, timestamp где guid - уникальное значение и мне надо получить avg значения weight скажем по app_id или event_id за какое-то время, а каждая 3-4 строка - это дубликат (70/52 = 1.3), как можно доверять результатам агрегирующих запросов кроме min/max? Не пойму, как я это дублирование могу компенсировать argMax как-то ну очень много дублей, чтобы игнорировать вносимое ими искажение м.б. guid должен быть включён в order by / primary key? сейчас там (app_id, timestamp)
но для чего тогда может быть использован ReplicatedMergeTree с таким количеством дуликатов? просто каждая четвёртая строка - дубликат. Очень пока неожиданно, была надежда, что дублицируемость будет пониже, ну там доли процентов или типа того....
SELECT app_id, avg(weight) FROM (SELECT guid, app_id, argMax(weight, timestamp) as weight FROM table GROUP BY guid, app_id) GROUP BY app_id
разумеется у вас в таблице должен стоять order by guid, app_id, event_id - иначе дубликатов будет слишком много.
Но у вас все равно их слишком много - или вы что-то не то с Kafka engine делаете, или действительно гоните на вход много дубликатов
добавьте гуид в ключ сортировки, дубликатов должно стать меньше, у вас точно 1 МВ пишет?
так в том-то и дело, что сейчас схема максимально простая: - много приложений генерят "события", маркируют каждое гуидом, пишут их в топик кафки. В топике 12 партиций - СН состоит из двух реплик (01 и 02) в кластере. На обеих репликах созданы atomic БД с ReplicatedMergeTree таблицами. На 01 добавлены KarkaEngine + MV для перегона данных в основную таблицу. Т.е. в 02 данные пересылает только сам СН в рамках репликации. по ресурсам (железным) всем всего хватает, никто не захлёбывается всё устраивает, кроме того, что собранными данными невозможно пользоваться не понимаю, что в этой схеме провоцирует возникновение такого количества дупликатов? Как это расследовать?
это точно СН-like подход? Нигде в примерах этого не встречал. Обычно делают order by (app_id, timestamp) или типа того, не встречал, чтобы для корректной работы надо было маркировать каждое событие uid'ом и включать в его в order by и первичный ключ. Звучит подозрительно ((
стоп. мы тут говорили про преодоление дубликатов в КХ. Это ReplacingMT, в вашем случае ReplicatedReplacingMergeThree. Но проблема не тут - дубликаты порождаются до поступления в очередь или в процессе считывания из очереди. Ищите там.
хах, да, действительно, невнимательно увидел то что у вас REPLICATED, а не REPLACING. Сорри. Половина сказанного раньше - неактуальна. Boris дело говорит.
Не подозрительно - жизнь она такая, и сделать exactly-once из кафки очень непросто. Смотрите историю чата - там была недавно дискуссия и прикольное видео от ebay. Поэтому делают ReplacingMT и героически борют дубликаты (но не в ваших объемах). У нас 3 ноды в кластере c ReplicatedReplacingMergeThree - каждая читает несколько партиций кафки и отдаёт остальным все что добыла. Дубли бывают, не немного и по нашей вине.
у меня с соседнем stage окружении, где всё то же самое (структура данных, кафка, источники событий, их распределение и пр.), но нет кластера и replicated* (используется обычный MT), количество дубликатов 7к на 121М И такая доля дубликатов устраивает, можно потерпеть ради всего остального в СН. Но то, что получилось в кластере - это перебор наверно, следующим шагом проведу испытание без репликации, посмотрю, что будет с дупликатами. Может быть от RPM как-то зависит? (на проде раз в 10-20 больше, чем на стейдже)
Это странно. Engine Kafka даёт дубликаты, но это обычно сотые доли процента. У вас что то неправильно настроено. Может разные консьмер группы на репликах?
Нет это не ch подход. Надо искать источник дубликатов
таблица на kafkaengine только на реплике 01. На ней же MV, которое из KE гонит данные в основную таблицу на движке ReplicatedMT, которая также есть на реплике 02 сотые доли процента вполне устраивают. И на варианте с меньшим RPM и без реплик их и имели. Буду экспериментировать
в kafka engine есть виртуальные поля _topic _partition _offset (уточните в доке я пишу по памяти). Добавьте их в вашу таблицу, и дальше проверяйте если у строк с одинаковым uuid разные _topic _partition _offset значит это продьюсер записал их два раза
хороший совет, спасибо. Проверю
добавил параллельно ещё одно MV из той же кафки, в котором забрал только guid, _partition, _offset и timestamp стартовало всё нормально, в агрегации по партициям видно, что значения смещений хорошие, всё выглядит прилично. но спустя какое-то время, выяснилось, что для одной из партиций минимальное значение смещения стало равно 0. т.е. эту партицию для топика MV (или весь движок KE?) решил перечитать целиком. выглядит сомнительно Как это расследовать дальше? версия: 21.11.3.6
тут еще дело в том что КХ не хранит оффсеты, он использует фичи кафки он может начать с 0, если в параметрах kafka engine задано earliest, но это типа вообще постоянно с 0 начинать
ну если бы он сразу с нуля и все партиции начал читать, можно было бы подумать на настройки. Но получилось, что начал он читать как надо, а потом для одной партиции слетел на ноль и перечитал с неё всё
ещё бы понять, ЧТО там искать. Обычный лог вообще неподъёмный по объёму, а журнал ошибок забит сообщениями вида: <Error> DynamicQueryHandler: Code: 497. DB::Exception: default: Not enough privileges. To execute this query it's necessary to have grant SHOW ROLES ON *.*. (ACCESS_DENIED), Stack trace (when copying this message, always include the lines below) без понятия, о чём это, но непохоже на причину проблем с кафкой
ну в общем КХ сам такое сделать не может. вам надо удостоверится что это не новая партиция, в смысле в кафке же можно добавить партиций
<Error> DynamicQueryHandler: Code: 497. DB::Exception: default: Not enough privileges. To execute this query it's necessary to have grant SHOW ROLES ON *.*. (ACCESS_DENIED), Stack trace (when copying this message, always include the lines below) ну в логе видно кто и что пытался сделать
у меня в топике 12 партиций. Сразу после создания нового MV, минут через пять, я сделал запрос на min(offset), max(offset) по партициям и там всё было в порядке - все значения ненулевые с ожидаемым равномерным распределением. Оставил набираться информации (я ж делал для поиска дубликатов, по вашему совету) и вернулся через час. Тогда и выяснилось, что партиция 5 была считана уже начиная с 0
какая версия КХ и какие settings у kafka=engine ?
>Тогда и выяснилось, что партиция 5 была считана уже начиная с 0 такого не должно быть, это какой-то баг
version: 21.11.3.6 engine = Kafka settings kafka_broker_list = 'kafka-04:9092,kafka-05:9092,kafka-06:9092', kafka_topic_list = 'Audit.Span', kafka_group_name = 'Audit.Clickhouse.Kafka.Engine', kafka_format = 'JSONEachRow', kafka_max_block_size = 1048576;
да вот похоже на баг. Есть другая среда, где отработала похожая схема без такой ошибки (200М сообщений) но там: - версия 21.10.2.15 - кафка на одной ноде - в топике партиция 1, а не 12
у вас может настройки в конфиге КХ в теге <kafka> есть?
нет ничего - надобности вроде не возникало. Для зукипера есть, а по кафке нет ничего
а TTL у топика в кафке есть? ищите в логе КХ "no offset stored" https://github.com/ClickHouse/ClickHouse/issues/20548
в кафке выставлено 24 часа для этого топика
ищите в логе КХ "no offset stored"
> ищите в логе КХ "no offset stored" нет ни в .err.log ни .log
встречается (довольно много): StorageKafka (inbox): Polled offset INVALID (topic: Audit.Span, partition: 5) но не только для партиции 5, а как-будто для всех. Не единичные, достаточно много есть и нормальные Polled offset с нормальными числовыми значениями это может что-то значить?
ну и что было этого? грепайте StorageKafka (inbox)
ложная тревога - это я утром на другой таблице и другом топике тренировался. Вообще не пересекаются с обсуждаемыми проблемными ни по БД в СН, ни по топику в кафке :(
просмотрел весь греп за сегодня (от подключения СН к кафке с утра до текущего момента) на Polled offset (span_inbox) для пятой партиции: значения всё время не убывающее. Т.е. значение смещения для это партиции по логам возрастало. По крайней мере сгрепленным таким образом Есть в самом начале (ещё с утра) INVALID, но это не прервало возрастанием. Дальше весь день - возрастание, включая момент, когда одно из MV, подключенных Kafka Engine таблице, выбрала почему-то данные из партиции 5 с нуля Туманная история. Завтра, скорее всего, снесу всё в СН, пересоздам заново и попробую воспроизвести опять
>когда одно из MV тут какое-то недопонимание. MV не читает из kafka engine. kafka engine читает из топика что-то, кладет в буфер, и указатель на этот буфер (один и тот же) передается во все mat view
да я именно так и понимаю вроде, может формулирую некорректно. просто информацию по смещениям у меня забирало только последнее MV. Два, которые были до этого, просто данные по целевым таблицам раскладывали.
Обсуждают сегодня