это всегда минус гарантия
Но у меня проблема с финальном топиком, где накапливаются пакеты: вижу дубли.
оконную надо ставить именно на стриме сбора батчей. тогда если есть 2 входных топика, которые группируются, то каждое сообщение в рамках каждого входного топика всё равно будет обрабатываться после предыдущего. когда окно закрывается - в каждом входном топике фиксируется оффсет последнего сообщения, вошедшего в окно. либо фиксируется, либо нет. мало того, кафка поддерживает транзакции, и можно в одной транзакции положить сообщение в результирующий топик и закрепить сообщения, собранные за это окно. (запись в одной транзакции в результирующий топик и в consumer-offsets для входного) и не важно, сколько партиций/топиков на входе, гарантия доставки будет соблюдена, а каждое сообщение попадёт в выходной топик только один раз.
Так оно и есть. Вот код, он небольшой: final var stream = kStreamBuilder .stream(topic, Consumed.with(Serdes.String(), CustomJsonSerde.createDocSerde())) .selectKey((key, value) -> value.bic()) .groupByKey(Grouped.with(Serdes.String(), CustomJsonSerde.createDocSerde())) .windowedBy(TimeWindows.ofSizeAndGrace(Duration.ofSeconds(periodLimit), Duration.ofSeconds(periodLimit))) .aggregate(() -> {
Обсуждают сегодня