materialized view .. to ... для доставки данных из топика в таблицу. Данные не идут, нигде никаких ошибок не вижу. Если делать вставку
insert into historyvalues select ... (запрос идентичный MV)
то данные вставляются.
Где искать проблему? Куда, в какие логи смотреть?
Пространно:
Создаем очередь
CREATE TABLE queue
(
value String
)
ENGINE = Kafka
SETTINGS kafka_broker_list = '192.168.XX.XX:9092',
kafka_topic_list = 'topicQueueTest',
kafka_group_name = 'group1',
kafka_format = 'LineAsString';
Создаем matview
CREATE MATERIALIZED VIEW IF NOT EXISTS
HistoryValuesMV TO historyvalues
AS SELECT toInt64(b.idtag) as IdTag
, b.valueFloat as valueFloat
, b.valueString as valueString
, b.timestamp as timestamp
, now() as timestampWT
from (select a.idtag
, (data.1) as valueFloat
, (data.2) as valueString
, fromUnixTimestamp64Milli((data.3),'UTC') as timestamp
from
(select _key as idtag
, JSONExtract(value||']','Array(Tuple(VF Nullable(Float64),VS Nullable(String),TS Int64))') as data
from queue) a
array join a.data as data) b;
Данные не идут, нигде никаких ошибок не вижу. Если делать вставку
insert into historyvalues select ... (запрос идентичный MV)
то данные вставляются.
Где искать проблему? Куда, в какие логи смотреть?
выставить логи как Trace рестартануть сервер и искать что происходит через tail -f /var/log/clickhouse-server/clickhouse-server.log | grep -i kafka данные не идут в MV потому что скорее всего не попадают в ENGINE = Kafka туда они могут не попадать по разным причинам может у вас топик кривой может формат сообщений в kafka не соответствует таблице (но об этом должны быть записи в логе) может доступа к брокеру нет (тоже должны быть логи)
Данные в ENGINE = Kafka идут, проверено. Более того, повторяю "Если делать вставку insert into historyvalues select ... (запрос идентичный MV) то данные вставляются." То есть запрос из MV прекрасно отрабатывает, данные из кафки вычитываются.
что то сомневаюсь что у вас запрос правильный (select_key as idtag точно не может быть правильным запросом
Почему? _key - виртуальная колонка, ключ сообщения в кафке
какая версия КХ ? поля в таблице тоже в таком регистре названы? IdTag ? сделайте таблицу temp ... Engine = Log , создайте еще одно mv на нее TO таже самая таблица и вставьте строку в этом же формате asstring в temp
ClickHouse server version 20.12.3 да, все регистры такие. Сейчас попробую с temp
Всё прекрасно вставилось из temp :) insert into temp values ('1010101010','[{"VS":47879,"TS":1614320449000,"Q":192},{"VS":0.2,"TS":1614320449001,"Q":192}') INSERT INTO temp VALUES Query id: 0368e959-d6ab-4147-ab2f-4ad2ea4fdeb0 Ok. 1 rows in set. Elapsed: 0.034 sec. :) select * from historyvalues where IdTag = 1010101010; SELECT * FROM historyvalues WHERE IdTag = 1010101010 Query id: 0ef1cd00-f757-493e-8b8d-aaa919a26534 ┌──────IdTag─┬─valueFloat─┬─valueString─┬─valueDateTime─┬───────────────timestamp─┬─────────────timestampWT─┬─quality─┐ │ 1010101010 │ ᴺᵁᴸᴸ │ ᴺᵁᴸᴸ │ ᴺᵁᴸᴸ │ 2021-02-26 06:20:49.000 │ 2021-02-26 06:22:11.000 │ 192 │ │ 1010101010 │ ᴺᵁᴸᴸ │ ᴺᵁᴸᴸ │ ᴺᵁᴸᴸ │ 2021-02-26 06:20:49.001 │ 2021-02-26 06:22:11.000 │ 192 │ └────────────┴────────────┴─────────────┴───────────────┴─────────────────────────┴─────────────────────────┴─────────┘ текст MV отличчается только заменой queue -> temp
Как вариант попробуйте создать mv без TO, а потом селектнуть напрямую из mv (на самом деле из его скрытой таблицы). Пробовали селект делать из таблицы с engine=Kafka? Причём такой какой пишите в mv и просто select *
Это первое, что мы попробовали. Из начального сообщения "Если делать вставку insert into historyvalues select ... (запрос идентичный MV) то данные вставляются."
Тогда думаю надо копать в сторону настроек сервера. Там есть периодичность вычитывания из кафки и размер блока, при котором происходит вычитывание даже если тайм-аут ещё не пришёл
у них в Kafka Таблицу все нормально читается у них по какой то причине у MV не попадает MV там не Kafka таблица... а обычный MergeTree что и странно
mv это просто триггер на инсерт, инициатор как раз таки таблица с кафкой, а чтение из неё автоматом (а следовательно и триггер) происходит согласно двум параметрам, про которые я написал. Если просто создать кафка таблицу, то такого автоматического чтения не будет
Проверить очень просто - если есть mv, то селект напрямую из кафки будет возвращать прерывистые данные относительно того что в топике, без mv вы будете всегда получать весь поток данных (все потому, что из кафки можно прочитать одни и те же данные только один раз)
В том то и фишка, что триггер вроде как не срабатывает. Непонятно почему. И сообщений о проблемах никаких
Поднимите в докере КХ той же версии с чистым конфигом и попробуйте вашу схему, можно ещё с другой версией проверить
а точнее можете сказать? честно говоря не понимаю про какие параметры вы говорите
"To improve performance, received messages are grouped into blocks the size of max_insert_block_size. If the block wasn’t formed within stream_flush_interval_ms milliseconds, the data will be flushed to the table regardless of the completeness of the block."
кстати да min_insert_block_size посмотрите и stream_flush_interval_ms SELECT * FROM system.settings WHERE name LIKE '%block_size%' OR name LIKE '%flush%' FORMAT Vertical;
Там если случайно задрать stream_flush_interval_ms то можно и не дождаться данных, если поток в топик не очень большой
у вас уровень логирования trace? В логе должны быть сообщения про пулл-комит кафки
Обсуждают сегодня