source, но при этом коммитить оффсеты - чтобы при рестарте коннекта не молотить неподходящие записи снова. Как быть в этой ситуации, не изобретая велосипед? Есть ли какая-нибудь хитрая возможность отдать SourceRecord, но не записать в топик - например, key=null, key=null и т.п.? Пока как work-around видим добавление мета-топика и в случае пропуска source-данных делать 1 SourceRecord с этим служебным топиком, а не целевым.
как обычно, мой способ для kafka-connect - через задницу @SneakyThrows public void writeOffsetDirectly( Map<String, ?> partitionMap, Map<String, ?> offsetMap ) { Object offsetWriter = createOffsetWriter(offsetReader); synchronized (offsetWriter) { // put data to internal collection #data on(offsetWriter).call("offset", partitionMap, offsetMap); // transfer data to another internal collection, that is used in doFlush(), #toFlush on(offsetWriter).call("beginFlush"); // start async flush with callback the way kafka-connect does on(offsetWriter).call("doFlush", callback).get(); } }
Обсуждают сегодня