чтобы в нём отбрасывались сообщения по ключу, если в топике уже есть сообщение с таким же ключом?
Привет! можно вот так реализовать: private void uniqueByKey(KStream<String, String> stream) { final String markForFirst = "!!!#"; stream .peek((key, value) -> log.debug("INPUT: {} -> {}", key, value)) .groupByKey(Grouped.with(Serdes.String(), Serdes.String())) .reduce((first, second) -> { log.debug("reduce: first = {}, second = {}", first, second); return first.startsWith(markForFirst) ? first : markForFirst + first; }) .toStream() .filter((key, value) -> !value.startsWith(markForFirst)) .peek((key, value) -> log.debug("!!! OUTPUT: {} -> {}", key, value)); }
Обсуждают сегодня