184 похожих чатов

Привет! Подскажите пжл по интеграции с Kafka. Попробовал узнать на SO,

но там пока без ответа https://stackoverflow.com/questions/71426336/clickhouse-kafka-engine-on-cluster

Продублирую сюда:
Запускаю кластер кх и кафку в докер-компоуз https://github.com/apanasevich/clickhouse-etl-cluster под Win10

Создаю реплицируемую таблицу, таблицу кафки и вью:

CREATE TABLE tmp ON CLUSTER cluster
(
`event_timestamp` DateTime64(3, 'Europe/Moscow'),
`account_id` Int32,
`session` String,
`type` Int16
)
ENGINE = ReplicatedMergeTree('/clickhouse/tables/{shard}/etl/tmp', '{replica}')
PARTITION BY toMonday(event_timestamp)
ORDER BY (account_id, event_timestamp)
TTL toDateTime(event_timestamp) + toIntervalDay(90);

CREATE TABLE tmp_kafka_datasource
(
`topic_data` String
)
ENGINE = Kafka
SETTINGS kafka_broker_list = 'kafka:29092', kafka_topic_list = 'Tmp',
kafka_group_name = 'clickhouse_etl_group', kafka_format = 'JSONAsString', kafka_num_consumers = 1;

CREATE MATERIALIZED VIEW tmp_consumer TO tmp
(
`event_timestamp` DateTime64(3),
`account_id` Int32,
`session` String,
`type` Int16
) AS
SELECT
fromUnixTimestamp64Milli(JSONExtract(topic_data, 'time', 'Int64')) AS event_timestamp,
toInt32(JSON_VALUE(topic_data, '$.data.accountId')) AS account_id,
JSON_VALUE(topic_data, '$.data.session') AS session,
toInt16(JSON_VALUE(topic_data, '$.data.type')) AS type
FROM tmp_kafka_datasource;

Проблема - в табличке нет данных, хотя для заданной консюмер группы в кафке нет лагов, т.е. данные читаюся. В том числе они читаются напрямую из таблицы кафка (если удалить вью).

Если сделать insert в реплицируемую таблицу, то данные вставляются и реплицируются.
Если создавать таблицу как не реплицируемую (ENGINE=MergeTree), то вся схема интеграции работает тоже.

Что не так сделано?

В логах только это:

2022.03.11 07:39:54.519652 [ 1 ] {} <Warning> Application: Listen [::]:9005 failed: Poco::Exception. Code: 1000, e.code() = 0, DNS error: EAI: Address family for hostname not supported (version 21.11.4.14 (official build)). If it is an IPv6 or IPv4 address and your host has disabled IPv6 or IPv4, then consider to specify not disabled IPv4 or IPv6 address to listen in <listen_host> element of configuration file. Example for disabled IPv6: <listen_host>0.0.0.0</listen_host> . Example for disabled IPv4: <listen_host>::</listen_host>
2022.03.11 07:39:54.641049 [ 249 ] {} <Error> virtual bool DB::DDLWorker::initializeMainThread(): Code: 999. Coordination::Exception: All connection tries failed while connecting to ZooKeeper. nodes: 172.27.0.2:2181
Poco::Exception. Code: 1000, e.code() = 111, Connection refused (version 21.11.4.14 (official build)), 172.27.0.2:2181
Poco::Exception. Code: 1000, e.code() = 111, Connection refused (version 21.11.4.14 (official build)), 172.27.0.2:2181
Poco::Exception. Code: 1000, e.code() = 111, Connection refused (version 21.11.4.14 (official build)), 172.27.0.2:2181
(Connection loss). (KEEPER_EXCEPTION), Stack trace (when copying this message, always include the lines below):

5 ответов

61 просмотр

Кажется, тут кафка не причём. Поэкспериментируйте просто с replicated таблицей. Как-будто с зукипером не может сн соединиться, соответственно не может проводить репликацию

Dmitrii-Apanasevich Автор вопроса
Alex Spiridonov
Кажется, тут кафка не причём. Поэкспериментируйте ...

Да, с кафкой всё ок. Для обычной MergeTree таблицы всё работает. А для реплицируемой - не работает вставка из MV, а вот вставка insert-ом работает. Пробовал другие реплицируемые таблицы создавать и делать там insert - тоже работает. Что ещё можно проверить?

а зачем вы дублируете описание tmp таблицы внутри MV? Не думаю что проблема тут, но я так обычно не делаю - это просто избыточно и некрасиво. Но может и ломается что-то.

Dmitrii-Apanasevich Автор вопроса
Boris
а зачем вы дублируете описание tmp таблицы внутри ...

Там нет дублирования. Разбирается JSON с вложенными структурами на части-столбцы

Dmitrii-Apanasevich Автор вопроса
Boris
а зачем вы дублируете описание tmp таблицы внутри ...

Сегодня свежим взглядом посмотрел и понял о чём вы: да, это явно лишнее. Взгляд уже замылился, поэтому не сразу понял замечание. Если убрать, то да: работает! Странно только, что для обычного MergeTree оно работало и так. Спасибо большое за подсказку!

Похожие вопросы

Обсуждают сегодня

Господа, а что сейчас вообще с рынком труда на делфи происходит? Какова ситуация?
Rꙮman Yankꙮvsky
29
А вообще, что может смущать в самой Julia - бы сказал, что нет единого стандартного подхода по многим моментам, поэтому многое выглядит как "хаки" и произвол. Короче говоря, с...
Viktor G.
2
30500 за редактор? )
Владимир
47
а через ESC-код ?
Alexey Kulakov
29
Чёт не понял, я ж правильной функцией воспользовался чтобы вывести отладочную информацию? но что-то она не ловится
notme
18
У меня есть функция где происходит это: write_bit(buffer, 1); write_bit(buffer, 0); write_bit(buffer, 1); write_bit(buffer, 1); write_bit(buffer, 1); w...
~
14
Добрый день! Скажите пожалуйста, а какие программы вы бы рекомендовали написать для того, чтобы научиться управлять памятью? Можно написать динамический массив, можно связный ...
Филипп
7
Недавно Google Project Zero нашёл багу в SQLite с помощью LLM, о чём достаточно было шумно в определённых интернетах, которые сопровождались рассказами, что скоро всех "ибешни...
Alex Sherbakov
5
Ребят в СИ можно реализовать ООП?
Николай
33
https://github.com/erlang/otp/blob/OTP-27.1/lib/kernel/src/logger_h_common.erl#L174 https://github.com/erlang/otp/blob/OTP-27.1/lib/kernel/src/logger_olp.erl#L76 15 лет назад...
Maksim Lapshin
20
Карта сайта