Доброго времени суток. Пробую играться с ksql есть топик, в

который kafka-connect’ом (debezium mysql) наливаются данные в json формате
создаю стрим

CREATE STREAM o (
OrderID INT KEY,
__version STRING
) WITH (
kafka_topic = 'orders',
value_format = 'json'
);

При попытке почитать из него select * from o emit changes
получаю в логах

[2021-01-31 22:28:11,684] WARN Exception caught during Deserialization, taskId: 0_0, topic: orders, partition: 0, offset: 1729 (org.apache.kafka.streams.processor.internals.StreamThread:36)
org.apache.kafka.common.errors.SerializationException: Error deserializing KAFKA message from topic: orders
Caused by: org.apache.kafka.common.errors.SerializationException: Size of data received by IntegerDeserializer is not 4
[2021-01-31 22:28:11,684] WARN stream-thread [_confluent-ksql-default_transient_7690121861292502830_1612132084399-d7889c74-b9f0-42b8-ba9f-03019c4d2db0-StreamThread-1] task [0_0] Skipping record due to deserialization error. topic=[ orders] partition=[0] offset=[1729] (org.apache.kafka.streams.processor.internals.RecordDeserializer:88)
org.apache.kafka.common.errors.SerializationException: Error deserializing KAFKA message from topic: orders
Caused by: org.apache.kafka.common.errors.SerializationException: Size of data received by IntegerDeserializer is not 4
[2021-01-31 22:28:11,684] ERROR {"type":0,"deserializationError":{"target":"key","errorMessage":"Error deserializing KAFKA message from topic: orders","recordB64":null,"cause":["Size of data received by IntegerDeserializer is not 4"],"topic":" orders"},"recordProcessingError":null,"productionError":null,"serializationError":null,"kafkaStreamsThreadError":null} (processing.7690121861292502830.KsqlTopic.Source.deserializer:44)

Если убрать ключ и оставить только OrderID INT, (без KEY) то читается нормально.

То же с таболицами. CREATE TABLE …. при создании с PRIMARY KEY не читается. Ощибки те же. В поле int нормальный, переполнения нет.
ЧЯДНТ?

5 ответов

21 просмотр

1) в каком формате льёт дебезиум? 2) какая версия ksqlDB? 3) что пишет print orders from beginning?

Oleksandr-Ryzhenko Автор вопроса
Vik Gamov
1) в каком формате льёт дебезиум? 2) какая версия ...

1. JSON. Применен ExtractRowTransformer, так что там без вложеннных структур. Пример соообщения в топике: {"OrderID":42501768,"__version":"1612153780532575455"} 2. confluentinc/ksqldb-server:0.14.0 confluentinc/ksqldb-cli:0.14.0 3. Key format: JSON or SESSION(KAFKA_INT) or SESSION(KAFKA_STRING) or HOPPING(KAFKA_STRING) or TUMBLING(KAFKA_STRING) or KAFKA_STRING Value format: JSON or KAFKA_STRING rowtime: 2021/01/29 08:59:23.328 Z, key: {"OrderID":42140921}, value: {"OrderID":42140921,"__version":"1611910763189080318"} rowtime: 2021/01/29 08:59:23.330 Z, key: {"OrderID":42140921}, value: {"OrderID":42140921,"__version":"1611910763320490304"} ……

Oleksandr-Ryzhenko Автор вопроса
Oleksandr Ryzhenko
1. JSON. Применен ExtractRowTransformer, так что т...

Сам докопался, что проблема в KEY_FORMAT. При создании стрима я его не указываю и он подставляется деволтным KAFKA. Дебезиум же пишет в ключ JSON, например: {"OrderID":42511627} но создание с KEY_FORMAT='JSON' приводит к другой ошибке при попытке чтения из стрима: [2021-02-01 10:41:04,832] ERROR {"type":0,"deserializationError":{"target":"key","errorMessage":"Failed to deserialize key from topic: orders. Can't convert type. sourceType: ObjectNode, requiredType: INTEGER, path: $","recordB64":null,"cause":["Can't convert type. sourceType: ObjectNode, requiredType: INTEGER, path: $","Can't convert type. sourceType: ObjectNode, requiredType: INTEGER"],"topic»:»orders»},»recordProcessingError":null,"productionError":null,"serializationError":null,"kafkaStreamsThreadError":null} (processing.6302525988885084951.KsqlTopic.Source.deserializer:44)

Oleksandr-Ryzhenko Автор вопроса
Vik Gamov
А что если long сделать orderid?

Помогло id string key и KEY_FORMAT=«KAFKA» Заметил в кондукторе, что в ключе сообщения string (который json, но все же изначально string). Пример ключа: {"OrderID":42547152} Отсюда вопрос №1: Можно ли при такой ситуации (в message key лежит строка с json-ом) в id талицы/стрима подставить значение поля из этого json-а? Вопрос №2. Я наверное не до конца понимаю суть таблицы KSQL. Подскажите плз в чем я заблуждаюсь. а) стрим - это собственно стрим данных, основанный на кафка топике; б) Таблица - грубо говоря, топик с последними состояниями сущностей по primary key в) таблицы можно кверить, как обычные мускуль таблицы через CLI либо REST API ( select * from table where id = 123 - вернет последнее состояние сущности с id = 123 из соответствующего топика). г) Под капотом у таблиц лежит compacted topic. 3) Почему когда я делаю запрос в таблицу, то получаю ошибку? ksql> select * from o_table where OrderID=42547941; Can't pull from O_TABLE as it's not a materialized table. See https://cnfl.io/queries for more info. Add EMIT CHANGES if you intended to issue a push query. В доке вроде как сказано, что к таблицам можно делать pull запрос на получение состояния сущности. И вот не понял в чем разница Table и Materialized view (которая по сути table, которая as select …) Сори за возможно делитантские вопросы, но я только погружаюсь в потоковое видение 🙂

Похожие вопросы

Обсуждают сегодня

Добрый вечер, Пока не совсем понимаю как наладить общение между телеграм ботом и ПО для работы с сим боксом. По самому боту так понял: - Нужен некий баланс, который можно поп...
Magic
6
сделал сайт, прикрутил в боте сайт, и виджет логина. как автоматически логинить пользователя в аккаунт(телеграм), при входе с бота?
Александра Чернивецкая
5
Объясните, пожалуйста, почему компилятор ругается на использование в условии неинициализированной переменной: int x; Task.Run(async () => { x = await somefunc(); }).Wait...
Александр
5
Ребят, подскажите, пожалуйста, почему в префиксе к ассетам, которые генерируются через фильтр | theme в шаблоне, стал вдруг появляться index.php? Вот так выглядит ссылка на а...
Виталий
1
Всем привет. Ребята, подскажите, пожалуйста. у ботов есть ограничение на отправку сообщений - 30 сообщений в секунду, эти ограничения накладываются на все сообщения? или на со...
Artem Stormageddon
4
Блин, ребята, сори за тупые вопросы. А можно ли как-то открыть вебапку по нажатию на кнопку в меню(которое появляется слева, команды)?
Artem Stormageddon
3
а плаксы из-под питона умеют только в комфортных условиях что-то выдавить из себя?)
Lencore
9
Но, может, есть уже проверенная? Наши требования такие: 1. Сообщения должны приходить из Инста в CRM оду 2. Должна быть возможность подключить несколько экаунтов Инстаграм. Р...
Alexander Sharoiko MSE / Александр Шаройко
13
Это может быть все-таки не флудвейт? у меня ботфазер принимает изменения и отображает даже что они изменились, на видео видно что он прислал якобы уже измененное описание, н...
OVERLINK
13
Коллеги, может знает кто, можно ли цвет бейджа счётчика в BackendMenu менять без бубнов?
Alex Blaze
3
Карта сайта