что не так ? 
                  
                  
                  
                  
                  
                  у меня путь kafka -> flink данные сохранены в avro
                  
                  
                  
                  
                  
                  И на флинке пытаюсь их привести к модели на основании avro схемы. 
                  
                  
                  Нашел несколько примером, сделал 1 в 1  .
                  
                  
                  (пример - https://stackoverflow.com/questions/38715286/how-to-decode-kafka-messages-using-avro-and-flink)
                  
                  
                  
                  
                  
                  модель авро создается автоматом через avro-maven-plugin (тут нет никаких проблем) 
                  
                  
                  Изучал документацию Flink они рекомендуют использовать для десериализации  данных AvroDeserializationSchema
                  
                  
                                          .forSpecific(class) , я уже и написал свой десериализатор 
                  
                  
                  Но возникает такая ошибка , что на моей десериализаторе, что на флинковском, но проблема не в классе так как он плагином создается на основании схемы и схема верная я на Nifi протестировал там все разбирает обратно из авро 
                  
                  
                  
                  
                  
                  Exception in thread "main" java.lang.NoSuchMethodError: org.apache.flink.api.java.typeutils.TypeExtractor.analyzePojo(Ljava/lang/Class;Ljava/util/ArrayList;Ljava/lang/reflect/ParameterizedType;Lorg/apache/flink/api/common/typeinfo/TypeInformation;Lorg/apache/flink/api/common/typeinfo/TypeInformation;)Lorg/apache/flink/api/common/typeinfo/TypeInformation;
                  
                  
                    at org.apache.flink.formats.avro.typeutils.AvroTypeInfo$PojoTypeExtractor.analyzePojo(AvroTypeInfo.java:103)
                  
                  
                  
                  
                  
                  Я уже весь гугл облазил не нашел внятного ответа, что не нравится и что делать уже не знаю ....
                  
                  
                  
                  
                  
                  AvroDeserializationSchema<EventLteLogPublicInfoTest> avroSchema =
                  
                  
                                  new AvroDeserializationSchema<>(EventLteLogPublicInfoTest.class);
                  
                  
                          KafkaSource<EventLteLogPublicInfoTest> source = KafkaSource.<EventLteLogPublicInfoTest>builder()
                  
                  
                                  .setProperties(props)
                  
                  
                                  .setBootstrapServers("local:9092")
                  
                  
                                  .setTopics("test")
                  
                  
                                  .setGroupId("test")
                  
                  
                                  .setStartingOffsets(OffsetsInitializer.earliest())
                  
                  
                                  /*.setValueOnlyDeserializer(org.apache.flink.formats.avro.AvroDeserializationSchema
                  
                  
                                          .forSpecific(EventLteLogPublicInfoTest.class))*/
                  
                  
                                  .setValueOnlyDeserializer(avroSchema)
                  
                  
                                  .build();
                  
                  
                          return source;
                  
                  
                      }
                  
                  
                  
                  
                  
                  Вот сама настройка.. 
                  
                  
                  КТо может дать совет вытащить из топика ввиде строки могу, но мне надо в виде объекта
                  
                  
                  
                  
                  
                  StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
                  
                  
                          DataStream<EventLteLogPublicInfoTest> dataStream = env.fromSource(ConfigFlink.config(),
                  
                  
                                  WatermarkStrategy.forMonotonousTimestamps(),"Kafka");
                  
                  
                          dataStream.print();
                  
                  
                          env.execute();
                  
                  
                
ищи в какой версии библиотеки есть класс TypeExtractor с методом analyzePojo нужной сигнатуры и сравнивай с тем что у тебя в проекте. различается - ищи причину и устраняй. если сходится - ищи почему в рантайме другая версия
нашел ошибку @Override public <OUT, IN1, IN2> TypeInformation<OUT> analyzePojo(Class<OUT> clazz, ArrayList<Type> typeHierarchy, ParameterizedType parameterizedType, TypeInformation<IN1> in1Type, TypeInformation<IN2> in2Type) { return super.analyzePojo(clazz, typeHierarchy, parameterizedType, in1Type, in2Type); Method does not override method from its superclass 'analyzePojo(java.lang.reflect.Type, java.util.List<java.lang.reflect.Type>, org.apache.flink.api.common.typeinfo.TypeInformation<IN1>, org.apache.flink.api.common.typeinfo.TypeInformation<IN2>)' in 'org.apache.flink.api.java.typeutils.TypeExtractor' cannot be applied to '(java.lang.Class<OUT>, java.util.ArrayList<java.lang.reflect.Type>, java.lang.reflect.ParameterizedType, org.apache.flink.api.common.typeinfo.TypeInformation<IN1>, org.apache.flink.api.common.typeinfo.TypeInformation<IN2>)' Но у меня нет доступа к этому классу,как решить вопрос?
Обсуждают сегодня