что не так ?
у меня путь kafka -> flink данные сохранены в avro
И на флинке пытаюсь их привести к модели на основании avro схемы.
Нашел несколько примером, сделал 1 в 1 .
(пример - https://stackoverflow.com/questions/38715286/how-to-decode-kafka-messages-using-avro-and-flink)
модель авро создается автоматом через avro-maven-plugin (тут нет никаких проблем)
Изучал документацию Flink они рекомендуют использовать для десериализации данных AvroDeserializationSchema
.forSpecific(class) , я уже и написал свой десериализатор
Но возникает такая ошибка , что на моей десериализаторе, что на флинковском, но проблема не в классе так как он плагином создается на основании схемы и схема верная я на Nifi протестировал там все разбирает обратно из авро
Exception in thread "main" java.lang.NoSuchMethodError: org.apache.flink.api.java.typeutils.TypeExtractor.analyzePojo(Ljava/lang/Class;Ljava/util/ArrayList;Ljava/lang/reflect/ParameterizedType;Lorg/apache/flink/api/common/typeinfo/TypeInformation;Lorg/apache/flink/api/common/typeinfo/TypeInformation;)Lorg/apache/flink/api/common/typeinfo/TypeInformation;
at org.apache.flink.formats.avro.typeutils.AvroTypeInfo$PojoTypeExtractor.analyzePojo(AvroTypeInfo.java:103)
Я уже весь гугл облазил не нашел внятного ответа, что не нравится и что делать уже не знаю ....
AvroDeserializationSchema<EventLteLogPublicInfoTest> avroSchema =
new AvroDeserializationSchema<>(EventLteLogPublicInfoTest.class);
KafkaSource<EventLteLogPublicInfoTest> source = KafkaSource.<EventLteLogPublicInfoTest>builder()
.setProperties(props)
.setBootstrapServers("local:9092")
.setTopics("test")
.setGroupId("test")
.setStartingOffsets(OffsetsInitializer.earliest())
/*.setValueOnlyDeserializer(org.apache.flink.formats.avro.AvroDeserializationSchema
.forSpecific(EventLteLogPublicInfoTest.class))*/
.setValueOnlyDeserializer(avroSchema)
.build();
return source;
}
Вот сама настройка..
КТо может дать совет вытащить из топика ввиде строки могу, но мне надо в виде объекта
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<EventLteLogPublicInfoTest> dataStream = env.fromSource(ConfigFlink.config(),
WatermarkStrategy.forMonotonousTimestamps(),"Kafka");
dataStream.print();
env.execute();
ищи в какой версии библиотеки есть класс TypeExtractor с методом analyzePojo нужной сигнатуры и сравнивай с тем что у тебя в проекте. различается - ищи причину и устраняй. если сходится - ищи почему в рантайме другая версия
нашел ошибку @Override public <OUT, IN1, IN2> TypeInformation<OUT> analyzePojo(Class<OUT> clazz, ArrayList<Type> typeHierarchy, ParameterizedType parameterizedType, TypeInformation<IN1> in1Type, TypeInformation<IN2> in2Type) { return super.analyzePojo(clazz, typeHierarchy, parameterizedType, in1Type, in2Type); Method does not override method from its superclass 'analyzePojo(java.lang.reflect.Type, java.util.List<java.lang.reflect.Type>, org.apache.flink.api.common.typeinfo.TypeInformation<IN1>, org.apache.flink.api.common.typeinfo.TypeInformation<IN2>)' in 'org.apache.flink.api.java.typeutils.TypeExtractor' cannot be applied to '(java.lang.Class<OUT>, java.util.ArrayList<java.lang.reflect.Type>, java.lang.reflect.ParameterizedType, org.apache.flink.api.common.typeinfo.TypeInformation<IN1>, org.apache.flink.api.common.typeinfo.TypeInformation<IN2>)' Но у меня нет доступа к этому классу,как решить вопрос?
Обсуждают сегодня