но там пока без ответа https://stackoverflow.com/questions/71426336/clickhouse-kafka-engine-on-cluster
Продублирую сюда:
Запускаю кластер кх и кафку в докер-компоуз https://github.com/apanasevich/clickhouse-etl-cluster под Win10
Создаю реплицируемую таблицу, таблицу кафки и вью:
CREATE TABLE tmp ON CLUSTER cluster
(
`event_timestamp` DateTime64(3, 'Europe/Moscow'),
`account_id` Int32,
`session` String,
`type` Int16
)
ENGINE = ReplicatedMergeTree('/clickhouse/tables/{shard}/etl/tmp', '{replica}')
PARTITION BY toMonday(event_timestamp)
ORDER BY (account_id, event_timestamp)
TTL toDateTime(event_timestamp) + toIntervalDay(90);
CREATE TABLE tmp_kafka_datasource
(
`topic_data` String
)
ENGINE = Kafka
SETTINGS kafka_broker_list = 'kafka:29092', kafka_topic_list = 'Tmp',
kafka_group_name = 'clickhouse_etl_group', kafka_format = 'JSONAsString', kafka_num_consumers = 1;
CREATE MATERIALIZED VIEW tmp_consumer TO tmp
(
`event_timestamp` DateTime64(3),
`account_id` Int32,
`session` String,
`type` Int16
) AS
SELECT
fromUnixTimestamp64Milli(JSONExtract(topic_data, 'time', 'Int64')) AS event_timestamp,
toInt32(JSON_VALUE(topic_data, '$.data.accountId')) AS account_id,
JSON_VALUE(topic_data, '$.data.session') AS session,
toInt16(JSON_VALUE(topic_data, '$.data.type')) AS type
FROM tmp_kafka_datasource;
Проблема - в табличке нет данных, хотя для заданной консюмер группы в кафке нет лагов, т.е. данные читаюся. В том числе они читаются напрямую из таблицы кафка (если удалить вью).
Если сделать insert в реплицируемую таблицу, то данные вставляются и реплицируются.
Если создавать таблицу как не реплицируемую (ENGINE=MergeTree), то вся схема интеграции работает тоже.
Что не так сделано?
В логах только это:
2022.03.11 07:39:54.519652 [ 1 ] {} <Warning> Application: Listen [::]:9005 failed: Poco::Exception. Code: 1000, e.code() = 0, DNS error: EAI: Address family for hostname not supported (version 21.11.4.14 (official build)). If it is an IPv6 or IPv4 address and your host has disabled IPv6 or IPv4, then consider to specify not disabled IPv4 or IPv6 address to listen in <listen_host> element of configuration file. Example for disabled IPv6: <listen_host>0.0.0.0</listen_host> . Example for disabled IPv4: <listen_host>::</listen_host>
2022.03.11 07:39:54.641049 [ 249 ] {} <Error> virtual bool DB::DDLWorker::initializeMainThread(): Code: 999. Coordination::Exception: All connection tries failed while connecting to ZooKeeper. nodes: 172.27.0.2:2181
Poco::Exception. Code: 1000, e.code() = 111, Connection refused (version 21.11.4.14 (official build)), 172.27.0.2:2181
Poco::Exception. Code: 1000, e.code() = 111, Connection refused (version 21.11.4.14 (official build)), 172.27.0.2:2181
Poco::Exception. Code: 1000, e.code() = 111, Connection refused (version 21.11.4.14 (official build)), 172.27.0.2:2181
(Connection loss). (KEEPER_EXCEPTION), Stack trace (when copying this message, always include the lines below):
Кажется, тут кафка не причём. Поэкспериментируйте просто с replicated таблицей. Как-будто с зукипером не может сн соединиться, соответственно не может проводить репликацию
Да, с кафкой всё ок. Для обычной MergeTree таблицы всё работает. А для реплицируемой - не работает вставка из MV, а вот вставка insert-ом работает. Пробовал другие реплицируемые таблицы создавать и делать там insert - тоже работает. Что ещё можно проверить?
а зачем вы дублируете описание tmp таблицы внутри MV? Не думаю что проблема тут, но я так обычно не делаю - это просто избыточно и некрасиво. Но может и ломается что-то.
Там нет дублирования. Разбирается JSON с вложенными структурами на части-столбцы
Сегодня свежим взглядом посмотрел и понял о чём вы: да, это явно лишнее. Взгляд уже замылился, поэтому не сразу понял замечание. Если убрать, то да: работает! Странно только, что для обычного MergeTree оно работало и так. Спасибо большое за подсказку!
Обсуждают сегодня