184 похожих чатов

Добрый день! заполняем целевую таблицу с движком ReplicatedMergeTree через MV из

таблицы с KafkaEngine
сейчас только идёт пробные запуск
обнаружил, что:
- count() для этой таблицы возвращает скажем 70М записей
- а count(distinct уникальное_guid_поле) возвращает 52M

т.е. похоже данные прилично дублируются (?). Причем именно в СН, поскольку суммарный offset по топику примерно в районе 52М
какому значению верить? как строить аггрегирующие запросы, чтобы дублирование не портило результаты?

43 ответов

21 просмотр

argMax, limit by, final Подчекнуть нужное. ReplacingMT и не гарантирует дедупликацию.

https://kb.altinity.com/engines/mergetree-table-engine-family/replacingmergetree/

Alex-Spiridonov Автор вопроса
Clir
argMax, limit by, final Подчекнуть нужное. Replaci...

если у меня таблица: guid, app_id, event_id, weight, timestamp где guid - уникальное значение и мне надо получить avg значения weight скажем по app_id или event_id за какое-то время, а каждая 3-4 строка - это дубликат (70/52 = 1.3), как можно доверять результатам агрегирующих запросов кроме min/max? Не пойму, как я это дублирование могу компенсировать argMax как-то ну очень много дублей, чтобы игнорировать вносимое ими искажение м.б. guid должен быть включён в order by / primary key? сейчас там (app_id, timestamp)

Alex-Spiridonov Автор вопроса
Boris
https://kb.altinity.com/engines/mergetree-table-en...

но для чего тогда может быть использован ReplicatedMergeTree с таким количеством дуликатов? просто каждая четвёртая строка - дубликат. Очень пока неожиданно, была надежда, что дублицируемость будет пониже, ну там доли процентов или типа того....

Alex Spiridonov
если у меня таблица: guid, app_id, event_id, weigh...

SELECT app_id, avg(weight) FROM (SELECT guid, app_id, argMax(weight, timestamp) as weight FROM table GROUP BY guid, app_id) GROUP BY app_id

Alex Spiridonov
но для чего тогда может быть использован Replicate...

разумеется у вас в таблице должен стоять order by guid, app_id, event_id - иначе дубликатов будет слишком много.

Alex Spiridonov
но для чего тогда может быть использован Replicate...

Но у вас все равно их слишком много - или вы что-то не то с Kafka engine делаете, или действительно гоните на вход много дубликатов

Alex Spiridonov
если у меня таблица: guid, app_id, event_id, weigh...

добавьте гуид в ключ сортировки, дубликатов должно стать меньше, у вас точно 1 МВ пишет?

Alex-Spiridonov Автор вопроса
Boris
Но у вас все равно их слишком много - или вы что-т...

так в том-то и дело, что сейчас схема максимально простая: - много приложений генерят "события", маркируют каждое гуидом, пишут их в топик кафки. В топике 12 партиций - СН состоит из двух реплик (01 и 02) в кластере. На обеих репликах созданы atomic БД с ReplicatedMergeTree таблицами. На 01 добавлены KarkaEngine + MV для перегона данных в основную таблицу. Т.е. в 02 данные пересылает только сам СН в рамках репликации. по ресурсам (железным) всем всего хватает, никто не захлёбывается всё устраивает, кроме того, что собранными данными невозможно пользоваться не понимаю, что в этой схеме провоцирует возникновение такого количества дупликатов? Как это расследовать?

Alex-Spiridonov Автор вопроса
Clir
добавьте гуид в ключ сортировки, дубликатов должно...

это точно СН-like подход? Нигде в примерах этого не встречал. Обычно делают order by (app_id, timestamp) или типа того, не встречал, чтобы для корректной работы надо было маркировать каждое событие uid'ом и включать в его в order by и первичный ключ. Звучит подозрительно ((

Alex Spiridonov
так в том-то и дело, что сейчас схема максимально ...

стоп. мы тут говорили про преодоление дубликатов в КХ. Это ReplacingMT, в вашем случае ReplicatedReplacingMergeThree. Но проблема не тут - дубликаты порождаются до поступления в очередь или в процессе считывания из очереди. Ищите там.

Alex Spiridonov
это точно СН-like подход? Нигде в примерах этого н...

хах, да, действительно, невнимательно увидел то что у вас REPLICATED, а не REPLACING. Сорри. Половина сказанного раньше - неактуальна. Boris дело говорит.

Alex Spiridonov
это точно СН-like подход? Нигде в примерах этого н...

Не подозрительно - жизнь она такая, и сделать exactly-once из кафки очень непросто. Смотрите историю чата - там была недавно дискуссия и прикольное видео от ebay. Поэтому делают ReplacingMT и героически борют дубликаты (но не в ваших объемах). У нас 3 ноды в кластере c ReplicatedReplacingMergeThree - каждая читает несколько партиций кафки и отдаёт остальным все что добыла. Дубли бывают, не немного и по нашей вине.

Alex-Spiridonov Автор вопроса
Boris
Не подозрительно - жизнь она такая, и сделать exac...

у меня с соседнем stage окружении, где всё то же самое (структура данных, кафка, источники событий, их распределение и пр.), но нет кластера и replicated* (используется обычный MT), количество дубликатов 7к на 121М И такая доля дубликатов устраивает, можно потерпеть ради всего остального в СН. Но то, что получилось в кластере - это перебор наверно, следующим шагом проведу испытание без репликации, посмотрю, что будет с дупликатами. Может быть от RPM как-то зависит? (на проде раз в 10-20 больше, чем на стейдже)

Это странно. Engine Kafka даёт дубликаты, но это обычно сотые доли процента. У вас что то неправильно настроено. Может разные консьмер группы на репликах?

Alex Spiridonov
это точно СН-like подход? Нигде в примерах этого н...

Нет это не ch подход. Надо искать источник дубликатов

Alex-Spiridonov Автор вопроса
Denny [Altinity]
Это странно. Engine Kafka даёт дубликаты, но это о...

таблица на kafkaengine только на реплике 01. На ней же MV, которое из KE гонит данные в основную таблицу на движке ReplicatedMT, которая также есть на реплике 02 сотые доли процента вполне устраивают. И на варианте с меньшим RPM и без реплик их и имели. Буду экспериментировать

Alex Spiridonov
таблица на kafkaengine только на реплике 01. На не...

в kafka engine есть виртуальные поля _topic _partition _offset (уточните в доке я пишу по памяти). Добавьте их в вашу таблицу, и дальше проверяйте если у строк с одинаковым uuid разные _topic _partition _offset значит это продьюсер записал их два раза

Alex-Spiridonov Автор вопроса
Alex-Spiridonov Автор вопроса
Denny [Altinity]
в kafka engine есть виртуальные поля _topic _parti...

добавил параллельно ещё одно MV из той же кафки, в котором забрал только guid, _partition, _offset и timestamp стартовало всё нормально, в агрегации по партициям видно, что значения смещений хорошие, всё выглядит прилично. но спустя какое-то время, выяснилось, что для одной из партиций минимальное значение смещения стало равно 0. т.е. эту партицию для топика MV (или весь движок KE?) решил перечитать целиком. выглядит сомнительно Как это расследовать дальше? версия: 21.11.3.6

Alex Spiridonov
screenshot добавил параллельно ещё одно MV из той же кафки, в...

тут еще дело в том что КХ не хранит оффсеты, он использует фичи кафки он может начать с 0, если в параметрах kafka engine задано earliest, но это типа вообще постоянно с 0 начинать

Alex-Spiridonov Автор вопроса
Denny [Altinity]
тут еще дело в том что КХ не хранит оффсеты, он ис...

ну если бы он сразу с нуля и все партиции начал читать, можно было бы подумать на настройки. Но получилось, что начал он читать как надо, а потом для одной партиции слетел на ноль и перечитал с неё всё

Alex-Spiridonov Автор вопроса
Denny [Altinity]
ну лог КХ читать

ещё бы понять, ЧТО там искать. Обычный лог вообще неподъёмный по объёму, а журнал ошибок забит сообщениями вида: <Error> DynamicQueryHandler: Code: 497. DB::Exception: default: Not enough privileges. To execute this query it's necessary to have grant SHOW ROLES ON *.*. (ACCESS_DENIED), Stack trace (when copying this message, always include the lines below) без понятия, о чём это, но непохоже на причину проблем с кафкой

Alex Spiridonov
ну если бы он сразу с нуля и все партиции начал чи...

ну в общем КХ сам такое сделать не может. вам надо удостоверится что это не новая партиция, в смысле в кафке же можно добавить партиций

Alex Spiridonov
ещё бы понять, ЧТО там искать. Обычный лог вообще ...

<Error> DynamicQueryHandler: Code: 497. DB::Exception: default: Not enough privileges. To execute this query it's necessary to have grant SHOW ROLES ON *.*. (ACCESS_DENIED), Stack trace (when copying this message, always include the lines below) ну в логе видно кто и что пытался сделать

Alex-Spiridonov Автор вопроса
Denny [Altinity]
ну в общем КХ сам такое сделать не может. вам надо...

у меня в топике 12 партиций. Сразу после создания нового MV, минут через пять, я сделал запрос на min(offset), max(offset) по партициям и там всё было в порядке - все значения ненулевые с ожидаемым равномерным распределением. Оставил набираться информации (я ж делал для поиска дубликатов, по вашему совету) и вернулся через час. Тогда и выяснилось, что партиция 5 была считана уже начиная с 0

Alex Spiridonov
у меня в топике 12 партиций. Сразу после создания ...

>Тогда и выяснилось, что партиция 5 была считана уже начиная с 0 такого не должно быть, это какой-то баг

Alex-Spiridonov Автор вопроса
Denny [Altinity]
какая версия КХ и какие settings у kafka=engine ?

version: 21.11.3.6 engine = Kafka settings kafka_broker_list = 'kafka-04:9092,kafka-05:9092,kafka-06:9092', kafka_topic_list = 'Audit.Span', kafka_group_name = 'Audit.Clickhouse.Kafka.Engine', kafka_format = 'JSONEachRow', kafka_max_block_size = 1048576;

Alex-Spiridonov Автор вопроса
Denny [Altinity]
>Тогда и выяснилось, что партиция 5 была считана у...

да вот похоже на баг. Есть другая среда, где отработала похожая схема без такой ошибки (200М сообщений) но там: - версия 21.10.2.15 - кафка на одной ноде - в топике партиция 1, а не 12

Alex Spiridonov
да вот похоже на баг. Есть другая среда, где отраб...

у вас может настройки в конфиге КХ в теге <kafka> есть?

Alex-Spiridonov Автор вопроса
Denny [Altinity]
у вас может настройки в конфиге КХ в теге <kafka> ...

нет ничего - надобности вроде не возникало. Для зукипера есть, а по кафке нет ничего

Alex Spiridonov
нет ничего - надобности вроде не возникало. Для зу...

а TTL у топика в кафке есть? ищите в логе КХ "no offset stored" https://github.com/ClickHouse/ClickHouse/issues/20548

Alex-Spiridonov Автор вопроса
Denny [Altinity]
а TTL у топика в кафке есть? ищите в логе КХ "no ...

в кафке выставлено 24 часа для этого топика

Alex-Spiridonov Автор вопроса
Denny [Altinity]
а TTL у топика в кафке есть? ищите в логе КХ "no ...

> ищите в логе КХ "no offset stored" нет ни в .err.log ни .log

Alex-Spiridonov Автор вопроса
Denny [Altinity]
ищите в логе КХ "no offset stored"

встречается (довольно много): StorageKafka (inbox): Polled offset INVALID (topic: Audit.Span, partition: 5) но не только для партиции 5, а как-будто для всех. Не единичные, достаточно много есть и нормальные Polled offset с нормальными числовыми значениями это может что-то значить?

Alex Spiridonov
встречается (довольно много): StorageKafka (inbox)...

ну и что было этого? грепайте StorageKafka (inbox)

Alex-Spiridonov Автор вопроса
Denny [Altinity]
ну и что было этого? грепайте StorageKafka (inbox)

ложная тревога - это я утром на другой таблице и другом топике тренировался. Вообще не пересекаются с обсуждаемыми проблемными ни по БД в СН, ни по топику в кафке :(

Alex-Spiridonov Автор вопроса
Denny [Altinity]
ну и что было этого? грепайте StorageKafka (inbox)

просмотрел весь греп за сегодня (от подключения СН к кафке с утра до текущего момента) на Polled offset (span_inbox) для пятой партиции: значения всё время не убывающее. Т.е. значение смещения для это партиции по логам возрастало. По крайней мере сгрепленным таким образом Есть в самом начале (ещё с утра) INVALID, но это не прервало возрастанием. Дальше весь день - возрастание, включая момент, когда одно из MV, подключенных Kafka Engine таблице, выбрала почему-то данные из партиции 5 с нуля Туманная история. Завтра, скорее всего, снесу всё в СН, пересоздам заново и попробую воспроизвести опять

Alex Spiridonov
просмотрел весь греп за сегодня (от подключения СН...

>когда одно из MV тут какое-то недопонимание. MV не читает из kafka engine. kafka engine читает из топика что-то, кладет в буфер, и указатель на этот буфер (один и тот же) передается во все mat view

Alex-Spiridonov Автор вопроса
Denny [Altinity]
>когда одно из MV тут какое-то недопонимание. MV ...

да я именно так и понимаю вроде, может формулирую некорректно. просто информацию по смещениям у меня забирало только последнее MV. Два, которые были до этого, просто данные по целевым таблицам раскладывали.

Похожие вопросы

Обсуждают сегодня

30500 за редактор? )
Владимир
47
а через ESC-код ?
Alexey Kulakov
29
Чёт не понял, я ж правильной функцией воспользовался чтобы вывести отладочную информацию? но что-то она не ловится
notme
18
Добрый день! Скажите пожалуйста, а какие программы вы бы рекомендовали написать для того, чтобы научиться управлять памятью? Можно написать динамический массив, можно связный ...
Филипп
7
У меня есть функция где происходит это: write_bit(buffer, 1); write_bit(buffer, 0); write_bit(buffer, 1); write_bit(buffer, 1); write_bit(buffer, 1); w...
~
14
Недавно Google Project Zero нашёл багу в SQLite с помощью LLM, о чём достаточно было шумно в определённых интернетах, которые сопровождались рассказами, что скоро всех "ибешни...
Alex Sherbakov
5
Ребят в СИ можно реализовать ООП?
Николай
33
https://github.com/erlang/otp/blob/OTP-27.1/lib/kernel/src/logger_h_common.erl#L174 https://github.com/erlang/otp/blob/OTP-27.1/lib/kernel/src/logger_olp.erl#L76 15 лет назад...
Maksim Lapshin
20
Как передать управляющий символ в открытую через CreateProcess консоль? Собсна, есть процедура: procedure TRedirectThread.WriteData(Data: OEMString); var Written: Cardinal;...
Serjone
6
Всем привет! Имеется функция: function IsValidChar(ch: UTF8Char): Boolean; var i: Integer; ValidChars: AnsiString; begin ValidChars := 'abcdefghijklmnopqrstuvwxyzABCDE...
Евгений
44
Карта сайта