который kafka-connect’ом (debezium mysql) наливаются данные в json формате
создаю стрим
CREATE STREAM o (
OrderID INT KEY,
__version STRING
) WITH (
kafka_topic = 'orders',
value_format = 'json'
);
При попытке почитать из него select * from o emit changes
получаю в логах
[2021-01-31 22:28:11,684] WARN Exception caught during Deserialization, taskId: 0_0, topic: orders, partition: 0, offset: 1729 (org.apache.kafka.streams.processor.internals.StreamThread:36)
org.apache.kafka.common.errors.SerializationException: Error deserializing KAFKA message from topic: orders
Caused by: org.apache.kafka.common.errors.SerializationException: Size of data received by IntegerDeserializer is not 4
[2021-01-31 22:28:11,684] WARN stream-thread [_confluent-ksql-default_transient_7690121861292502830_1612132084399-d7889c74-b9f0-42b8-ba9f-03019c4d2db0-StreamThread-1] task [0_0] Skipping record due to deserialization error. topic=[ orders] partition=[0] offset=[1729] (org.apache.kafka.streams.processor.internals.RecordDeserializer:88)
org.apache.kafka.common.errors.SerializationException: Error deserializing KAFKA message from topic: orders
Caused by: org.apache.kafka.common.errors.SerializationException: Size of data received by IntegerDeserializer is not 4
[2021-01-31 22:28:11,684] ERROR {"type":0,"deserializationError":{"target":"key","errorMessage":"Error deserializing KAFKA message from topic: orders","recordB64":null,"cause":["Size of data received by IntegerDeserializer is not 4"],"topic":" orders"},"recordProcessingError":null,"productionError":null,"serializationError":null,"kafkaStreamsThreadError":null} (processing.7690121861292502830.KsqlTopic.Source.deserializer:44)
Если убрать ключ и оставить только OrderID INT, (без KEY) то читается нормально.
То же с таболицами. CREATE TABLE …. при создании с PRIMARY KEY не читается. Ощибки те же. В поле int нормальный, переполнения нет.
ЧЯДНТ?
1) в каком формате льёт дебезиум? 2) какая версия ksqlDB? 3) что пишет print orders from beginning?
1. JSON. Применен ExtractRowTransformer, так что там без вложеннных структур. Пример соообщения в топике: {"OrderID":42501768,"__version":"1612153780532575455"} 2. confluentinc/ksqldb-server:0.14.0 confluentinc/ksqldb-cli:0.14.0 3. Key format: JSON or SESSION(KAFKA_INT) or SESSION(KAFKA_STRING) or HOPPING(KAFKA_STRING) or TUMBLING(KAFKA_STRING) or KAFKA_STRING Value format: JSON or KAFKA_STRING rowtime: 2021/01/29 08:59:23.328 Z, key: {"OrderID":42140921}, value: {"OrderID":42140921,"__version":"1611910763189080318"} rowtime: 2021/01/29 08:59:23.330 Z, key: {"OrderID":42140921}, value: {"OrderID":42140921,"__version":"1611910763320490304"} ……
Сам докопался, что проблема в KEY_FORMAT. При создании стрима я его не указываю и он подставляется деволтным KAFKA. Дебезиум же пишет в ключ JSON, например: {"OrderID":42511627} но создание с KEY_FORMAT='JSON' приводит к другой ошибке при попытке чтения из стрима: [2021-02-01 10:41:04,832] ERROR {"type":0,"deserializationError":{"target":"key","errorMessage":"Failed to deserialize key from topic: orders. Can't convert type. sourceType: ObjectNode, requiredType: INTEGER, path: $","recordB64":null,"cause":["Can't convert type. sourceType: ObjectNode, requiredType: INTEGER, path: $","Can't convert type. sourceType: ObjectNode, requiredType: INTEGER"],"topic»:»orders»},»recordProcessingError":null,"productionError":null,"serializationError":null,"kafkaStreamsThreadError":null} (processing.6302525988885084951.KsqlTopic.Source.deserializer:44)
А что если long сделать orderid?
Помогло id string key и KEY_FORMAT=«KAFKA» Заметил в кондукторе, что в ключе сообщения string (который json, но все же изначально string). Пример ключа: {"OrderID":42547152} Отсюда вопрос №1: Можно ли при такой ситуации (в message key лежит строка с json-ом) в id талицы/стрима подставить значение поля из этого json-а? Вопрос №2. Я наверное не до конца понимаю суть таблицы KSQL. Подскажите плз в чем я заблуждаюсь. а) стрим - это собственно стрим данных, основанный на кафка топике; б) Таблица - грубо говоря, топик с последними состояниями сущностей по primary key в) таблицы можно кверить, как обычные мускуль таблицы через CLI либо REST API ( select * from table where id = 123 - вернет последнее состояние сущности с id = 123 из соответствующего топика). г) Под капотом у таблиц лежит compacted topic. 3) Почему когда я делаю запрос в таблицу, то получаю ошибку? ksql> select * from o_table where OrderID=42547941; Can't pull from O_TABLE as it's not a materialized table. See https://cnfl.io/queries for more info. Add EMIT CHANGES if you intended to issue a push query. В доке вроде как сказано, что к таблицам можно делать pull запрос на получение состояния сущности. И вот не понял в чем разница Table и Materialized view (которая по сути table, которая as select …) Сори за возможно делитантские вопросы, но я только погружаюсь в потоковое видение 🙂
Обсуждают сегодня