Доброго времени суток. Пробую играться с ksql есть топик, в

который kafka-connect’ом (debezium mysql) наливаются данные в json формате
создаю стрим

CREATE STREAM o (
OrderID INT KEY,
__version STRING
) WITH (
kafka_topic = 'orders',
value_format = 'json'
);

При попытке почитать из него select * from o emit changes
получаю в логах

[2021-01-31 22:28:11,684] WARN Exception caught during Deserialization, taskId: 0_0, topic: orders, partition: 0, offset: 1729 (org.apache.kafka.streams.processor.internals.StreamThread:36)
org.apache.kafka.common.errors.SerializationException: Error deserializing KAFKA message from topic: orders
Caused by: org.apache.kafka.common.errors.SerializationException: Size of data received by IntegerDeserializer is not 4
[2021-01-31 22:28:11,684] WARN stream-thread [_confluent-ksql-default_transient_7690121861292502830_1612132084399-d7889c74-b9f0-42b8-ba9f-03019c4d2db0-StreamThread-1] task [0_0] Skipping record due to deserialization error. topic=[ orders] partition=[0] offset=[1729] (org.apache.kafka.streams.processor.internals.RecordDeserializer:88)
org.apache.kafka.common.errors.SerializationException: Error deserializing KAFKA message from topic: orders
Caused by: org.apache.kafka.common.errors.SerializationException: Size of data received by IntegerDeserializer is not 4
[2021-01-31 22:28:11,684] ERROR {"type":0,"deserializationError":{"target":"key","errorMessage":"Error deserializing KAFKA message from topic: orders","recordB64":null,"cause":["Size of data received by IntegerDeserializer is not 4"],"topic":" orders"},"recordProcessingError":null,"productionError":null,"serializationError":null,"kafkaStreamsThreadError":null} (processing.7690121861292502830.KsqlTopic.Source.deserializer:44)

Если убрать ключ и оставить только OrderID INT, (без KEY) то читается нормально.

То же с таболицами. CREATE TABLE …. при создании с PRIMARY KEY не читается. Ощибки те же. В поле int нормальный, переполнения нет.
ЧЯДНТ?

5 ответов

16 просмотров

1) в каком формате льёт дебезиум? 2) какая версия ksqlDB? 3) что пишет print orders from beginning?

Oleksandr-Ryzhenko Автор вопроса
Vik Gamov
1) в каком формате льёт дебезиум? 2) какая версия ...

1. JSON. Применен ExtractRowTransformer, так что там без вложеннных структур. Пример соообщения в топике: {"OrderID":42501768,"__version":"1612153780532575455"} 2. confluentinc/ksqldb-server:0.14.0 confluentinc/ksqldb-cli:0.14.0 3. Key format: JSON or SESSION(KAFKA_INT) or SESSION(KAFKA_STRING) or HOPPING(KAFKA_STRING) or TUMBLING(KAFKA_STRING) or KAFKA_STRING Value format: JSON or KAFKA_STRING rowtime: 2021/01/29 08:59:23.328 Z, key: {"OrderID":42140921}, value: {"OrderID":42140921,"__version":"1611910763189080318"} rowtime: 2021/01/29 08:59:23.330 Z, key: {"OrderID":42140921}, value: {"OrderID":42140921,"__version":"1611910763320490304"} ……

Oleksandr-Ryzhenko Автор вопроса
Oleksandr Ryzhenko
1. JSON. Применен ExtractRowTransformer, так что т...

Сам докопался, что проблема в KEY_FORMAT. При создании стрима я его не указываю и он подставляется деволтным KAFKA. Дебезиум же пишет в ключ JSON, например: {"OrderID":42511627} но создание с KEY_FORMAT='JSON' приводит к другой ошибке при попытке чтения из стрима: [2021-02-01 10:41:04,832] ERROR {"type":0,"deserializationError":{"target":"key","errorMessage":"Failed to deserialize key from topic: orders. Can't convert type. sourceType: ObjectNode, requiredType: INTEGER, path: $","recordB64":null,"cause":["Can't convert type. sourceType: ObjectNode, requiredType: INTEGER, path: $","Can't convert type. sourceType: ObjectNode, requiredType: INTEGER"],"topic»:»orders»},»recordProcessingError":null,"productionError":null,"serializationError":null,"kafkaStreamsThreadError":null} (processing.6302525988885084951.KsqlTopic.Source.deserializer:44)

Oleksandr-Ryzhenko Автор вопроса
Vik Gamov
А что если long сделать orderid?

Помогло id string key и KEY_FORMAT=«KAFKA» Заметил в кондукторе, что в ключе сообщения string (который json, но все же изначально string). Пример ключа: {"OrderID":42547152} Отсюда вопрос №1: Можно ли при такой ситуации (в message key лежит строка с json-ом) в id талицы/стрима подставить значение поля из этого json-а? Вопрос №2. Я наверное не до конца понимаю суть таблицы KSQL. Подскажите плз в чем я заблуждаюсь. а) стрим - это собственно стрим данных, основанный на кафка топике; б) Таблица - грубо говоря, топик с последними состояниями сущностей по primary key в) таблицы можно кверить, как обычные мускуль таблицы через CLI либо REST API ( select * from table where id = 123 - вернет последнее состояние сущности с id = 123 из соответствующего топика). г) Под капотом у таблиц лежит compacted topic. 3) Почему когда я делаю запрос в таблицу, то получаю ошибку? ksql> select * from o_table where OrderID=42547941; Can't pull from O_TABLE as it's not a materialized table. See https://cnfl.io/queries for more info. Add EMIT CHANGES if you intended to issue a push query. В доке вроде как сказано, что к таблицам можно делать pull запрос на получение состояния сущности. И вот не понял в чем разница Table и Materialized view (которая по сути table, которая as select …) Сори за возможно делитантские вопросы, но я только погружаюсь в потоковое видение 🙂

Похожие вопросы

Обсуждают сегодня

Ребята, всем привет. Подскажите, пожалуйста, можно ли как-то через бота понять, что этого бота добавили в группу\канал и выдали ему права администратора?
Artem Stormageddon
9
Это переведённый текст с английского. Я не говорю на русском, но могу использовать переводчик Телеграм. Приветствую! Я начинающий веб-разработчик и все еще учусь. В настояще...
𐩱𐩪𐩣𐩱𐩲𐩺𐩡
2
А не хотим ли мы развлечься? 😉 Но так чтобы с пользой для наших профессиональных навыков?? 👨‍🎓👩‍🎓 Предлагаю на октябрь запланировать тестовый запуск новой командной игры "Игр...
Andrii Kurdiumov
2
Привет всем! Почему этот код не срабатывает при добавлении или удалении пользователя из чата? bot.on('chat_member', async (ctx) => { console.log(ctx); }) bot.launch({allo...
Alexander
5
у кого сколько оперативы на базе данных ?
АДИЛЬБЕК
4
Через бот апи возможно получить ID стикерпака? Не ссылку.
Vexylon [АФК до 09.09]
5
Привет Хочу сделать аналог iCloud’а для своих проектов, чтобы пользовательская информация хранилась в облаке, была доступна во всех сервисах, её можно было подсасывать везде)...
Виталий
9
В тг можно спарсить всех кто пишет в группе? Если список участников скрыт
S
3
код Event::listen('cms.page.display', function (&$content, $slug, $page, $html) { if (is_object($content)) { dump($content); } else { dump($s...
Point 111
3
Ребят, а двух-факторку для плагина Users и для бэкенда октября кто-то прикручивал? Поделитесь опытом
Constantine Anikin
4
Карта сайта