170 похожих чатов

Всем добрый день! Кто может подсказать уже руки опускаются не знаю,

что не так ?

у меня путь kafka -> flink данные сохранены в avro

И на флинке пытаюсь их привести к модели на основании avro схемы.
Нашел несколько примером, сделал 1 в 1 .
(пример - https://stackoverflow.com/questions/38715286/how-to-decode-kafka-messages-using-avro-and-flink)

модель авро создается автоматом через avro-maven-plugin (тут нет никаких проблем)
Изучал документацию Flink они рекомендуют использовать для десериализации данных AvroDeserializationSchema
.forSpecific(class) , я уже и написал свой десериализатор
Но возникает такая ошибка , что на моей десериализаторе, что на флинковском, но проблема не в классе так как он плагином создается на основании схемы и схема верная я на Nifi протестировал там все разбирает обратно из авро

Exception in thread "main" java.lang.NoSuchMethodError: org.apache.flink.api.java.typeutils.TypeExtractor.analyzePojo(Ljava/lang/Class;Ljava/util/ArrayList;Ljava/lang/reflect/ParameterizedType;Lorg/apache/flink/api/common/typeinfo/TypeInformation;Lorg/apache/flink/api/common/typeinfo/TypeInformation;)Lorg/apache/flink/api/common/typeinfo/TypeInformation;
at org.apache.flink.formats.avro.typeutils.AvroTypeInfo$PojoTypeExtractor.analyzePojo(AvroTypeInfo.java:103)

Я уже весь гугл облазил не нашел внятного ответа, что не нравится и что делать уже не знаю ....

AvroDeserializationSchema<EventLteLogPublicInfoTest> avroSchema =
new AvroDeserializationSchema<>(EventLteLogPublicInfoTest.class);
KafkaSource<EventLteLogPublicInfoTest> source = KafkaSource.<EventLteLogPublicInfoTest>builder()
.setProperties(props)
.setBootstrapServers("local:9092")
.setTopics("test")
.setGroupId("test")
.setStartingOffsets(OffsetsInitializer.earliest())
/*.setValueOnlyDeserializer(org.apache.flink.formats.avro.AvroDeserializationSchema
.forSpecific(EventLteLogPublicInfoTest.class))*/
.setValueOnlyDeserializer(avroSchema)
.build();
return source;
}

Вот сама настройка..
КТо может дать совет вытащить из топика ввиде строки могу, но мне надо в виде объекта

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<EventLteLogPublicInfoTest> dataStream = env.fromSource(ConfigFlink.config(),
WatermarkStrategy.forMonotonousTimestamps(),"Kafka");
dataStream.print();
env.execute();

2 ответов

18 просмотров

ищи в какой версии библиотеки есть класс TypeExtractor с методом analyzePojo нужной сигнатуры и сравнивай с тем что у тебя в проекте. различается - ищи причину и устраняй. если сходится - ищи почему в рантайме другая версия

Илья-Шапорто Автор вопроса
Alexandr ∨∧‾ Emelyanov
ищи в какой версии библиотеки есть класс TypeExtra...

нашел ошибку @Override public <OUT, IN1, IN2> TypeInformation<OUT> analyzePojo(Class<OUT> clazz, ArrayList<Type> typeHierarchy, ParameterizedType parameterizedType, TypeInformation<IN1> in1Type, TypeInformation<IN2> in2Type) { return super.analyzePojo(clazz, typeHierarchy, parameterizedType, in1Type, in2Type); Method does not override method from its superclass 'analyzePojo(java.lang.reflect.Type, java.util.List<java.lang.reflect.Type>, org.apache.flink.api.common.typeinfo.TypeInformation<IN1>, org.apache.flink.api.common.typeinfo.TypeInformation<IN2>)' in 'org.apache.flink.api.java.typeutils.TypeExtractor' cannot be applied to '(java.lang.Class<OUT>, java.util.ArrayList<java.lang.reflect.Type>, java.lang.reflect.ParameterizedType, org.apache.flink.api.common.typeinfo.TypeInformation<IN1>, org.apache.flink.api.common.typeinfo.TypeInformation<IN2>)' Но у меня нет доступа к этому классу,как решить вопрос?

Похожие вопросы

Обсуждают сегодня

Господа, а что сейчас вообще с рынком труда на делфи происходит? Какова ситуация?
Rꙮman Yankꙮvsky
29
А вообще, что может смущать в самой Julia - бы сказал, что нет единого стандартного подхода по многим моментам, поэтому многое выглядит как "хаки" и произвол. Короче говоря, с...
Viktor G.
2
30500 за редактор? )
Владимир
47
а через ESC-код ?
Alexey Kulakov
29
Чёт не понял, я ж правильной функцией воспользовался чтобы вывести отладочную информацию? но что-то она не ловится
notme
18
У меня есть функция где происходит это: write_bit(buffer, 1); write_bit(buffer, 0); write_bit(buffer, 1); write_bit(buffer, 1); write_bit(buffer, 1); w...
~
14
Добрый день! Скажите пожалуйста, а какие программы вы бы рекомендовали написать для того, чтобы научиться управлять памятью? Можно написать динамический массив, можно связный ...
Филипп
7
Недавно Google Project Zero нашёл багу в SQLite с помощью LLM, о чём достаточно было шумно в определённых интернетах, которые сопровождались рассказами, что скоро всех "ибешни...
Alex Sherbakov
5
Ребят в СИ можно реализовать ООП?
Николай
33
https://github.com/erlang/otp/blob/OTP-27.1/lib/kernel/src/logger_h_common.erl#L174 https://github.com/erlang/otp/blob/OTP-27.1/lib/kernel/src/logger_olp.erl#L76 15 лет назад...
Maksim Lapshin
20
Карта сайта