170 похожих чатов

Всем добрый день! Кто может подсказать уже руки опускаются не знаю,

что не так ?

у меня путь kafka -> flink данные сохранены в avro

И на флинке пытаюсь их привести к модели на основании avro схемы.
Нашел несколько примером, сделал 1 в 1 .
(пример - https://stackoverflow.com/questions/38715286/how-to-decode-kafka-messages-using-avro-and-flink)

модель авро создается автоматом через avro-maven-plugin (тут нет никаких проблем)
Изучал документацию Flink они рекомендуют использовать для десериализации данных AvroDeserializationSchema
.forSpecific(class) , я уже и написал свой десериализатор
Но возникает такая ошибка , что на моей десериализаторе, что на флинковском, но проблема не в классе так как он плагином создается на основании схемы и схема верная я на Nifi протестировал там все разбирает обратно из авро

Exception in thread "main" java.lang.NoSuchMethodError: org.apache.flink.api.java.typeutils.TypeExtractor.analyzePojo(Ljava/lang/Class;Ljava/util/ArrayList;Ljava/lang/reflect/ParameterizedType;Lorg/apache/flink/api/common/typeinfo/TypeInformation;Lorg/apache/flink/api/common/typeinfo/TypeInformation;)Lorg/apache/flink/api/common/typeinfo/TypeInformation;
at org.apache.flink.formats.avro.typeutils.AvroTypeInfo$PojoTypeExtractor.analyzePojo(AvroTypeInfo.java:103)

Я уже весь гугл облазил не нашел внятного ответа, что не нравится и что делать уже не знаю ....

AvroDeserializationSchema<EventLteLogPublicInfoTest> avroSchema =
new AvroDeserializationSchema<>(EventLteLogPublicInfoTest.class);
KafkaSource<EventLteLogPublicInfoTest> source = KafkaSource.<EventLteLogPublicInfoTest>builder()
.setProperties(props)
.setBootstrapServers("local:9092")
.setTopics("test")
.setGroupId("test")
.setStartingOffsets(OffsetsInitializer.earliest())
/*.setValueOnlyDeserializer(org.apache.flink.formats.avro.AvroDeserializationSchema
.forSpecific(EventLteLogPublicInfoTest.class))*/
.setValueOnlyDeserializer(avroSchema)
.build();
return source;
}

Вот сама настройка..
КТо может дать совет вытащить из топика ввиде строки могу, но мне надо в виде объекта

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<EventLteLogPublicInfoTest> dataStream = env.fromSource(ConfigFlink.config(),
WatermarkStrategy.forMonotonousTimestamps(),"Kafka");
dataStream.print();
env.execute();

2 ответов

8 просмотров

ищи в какой версии библиотеки есть класс TypeExtractor с методом analyzePojo нужной сигнатуры и сравнивай с тем что у тебя в проекте. различается - ищи причину и устраняй. если сходится - ищи почему в рантайме другая версия

Илья-Шапорто Автор вопроса
Alexandr ∨∧‾ Emelyanov
ищи в какой версии библиотеки есть класс TypeExtra...

нашел ошибку @Override public <OUT, IN1, IN2> TypeInformation<OUT> analyzePojo(Class<OUT> clazz, ArrayList<Type> typeHierarchy, ParameterizedType parameterizedType, TypeInformation<IN1> in1Type, TypeInformation<IN2> in2Type) { return super.analyzePojo(clazz, typeHierarchy, parameterizedType, in1Type, in2Type); Method does not override method from its superclass 'analyzePojo(java.lang.reflect.Type, java.util.List<java.lang.reflect.Type>, org.apache.flink.api.common.typeinfo.TypeInformation<IN1>, org.apache.flink.api.common.typeinfo.TypeInformation<IN2>)' in 'org.apache.flink.api.java.typeutils.TypeExtractor' cannot be applied to '(java.lang.Class<OUT>, java.util.ArrayList<java.lang.reflect.Type>, java.lang.reflect.ParameterizedType, org.apache.flink.api.common.typeinfo.TypeInformation<IN1>, org.apache.flink.api.common.typeinfo.TypeInformation<IN2>)' Но у меня нет доступа к этому классу,как решить вопрос?

Похожие вопросы

Обсуждают сегодня

Типа вызывать GetParent и проверять на соответствие GetModuleHandle?
The Bird of Hermes
67
Всем привет! Кто нибудь парсил в ручную JSON без библиотек и фреймворков? Есть может ссылки на оптимальный алгоритмы работы с текстом и примеры таких парсеров?
Lem
27
Do any of you guys have interesting projects one could join? I'm a Middle Full-Stack developer (JS/TS, React & Node)
Lev Shapiro
40
Есть сайт. Там была древняя версия эво. Стоял плагин, который каждый коммент в Jot делал отдельной страницей. После обновления все слетело, теперь старница открывается отдельн...
Artem
1
$res = json_decode($наша строка из респонса); $res1 = array_map(fn($o) => $o->name, $res->breadcrumbs[0]->entities); Как такое будет на Хаскеле?.. В начале весь джейсон, в ко...
Хаскель Моисеевич Гопник
27
Добрый день. А shovel'ы можно как-то сконфигурировать в definitions.json? Пробовал что-то вроде: { "users": [ { "name": "agent", "password_hash": "RBCbTzQd...
Aleksey
1
Вопрос по диагностике ошибок (я знаю в чем, в данном конкретном примере, я знаю, как исправить, пример модельный, понятно, что в реальности бывает намного запутаннее). module...
ⰄⰎⰋⰐⰐⰑⰛⰤⰧⰧⰩⰄ ⰊⰑⰁⰓⰡⰛⰦⰕⰫ
11
А чем вам питонисты не угодили?😂
.
79
В чем сила брат, в NASM или FASM?
Isaac Kleiner
18
Есть какой-нибудь для Delphi/FPC T*Compression(Decompression)Stream на базе LZ4/Zstd/любой другой быстрый(и хорошо сжимающий) алгоритм А ещё лучше в pure pascal А ещё лучше од...
notme
52
Карта сайта