топик "А" с объектами класса Aclass делаю groupByKey().windowedBy().aggregate().toStream().map() в map преобразую результат аггрегации в Bclass и потом отправляю в топик B.
Проблема в том, что в сообщениях топика B, остается Headers( __TypeId__ = Aclass)
И листенер топика B пытается сериализовать Bclass в Aclass.
Вопрос, что делаю не так?
А есть ли какие-то причины/сложности, из-за которых в map нельзя поправить header, чтобы в TypeId был Bclass?
Я разобрался, там была проблема в обертке спринга над кафкой. продюсер писал через спринговый JsonSerializer и добавлятл TypedId это (я так понял чисто спринговый подход, чтоб ObjectMapper мог понять что в велью лежит) а в кафка стримс, там упрощенный кастомный Serde без спринга был, и хедер просто передавался дальше. (это дефолтное поведение в стримс, что выглядит не очень логичным) А дальше опять консюмер спринговый со своим JsonDeserializer который смотрел в TypeId и ппадал от несоответствия между значением хедера и что в байтмассиве лежит.
map вроде более высокоуровневая штука и не дает с хедерами работать, для этого Processor API нужно, скорей всего, но глубо не копал. (ну или в сериалайзере это делать)
Да, похоже на то, только key, value
Обсуждают сегодня