есть кафка стрим 2.6, примерно такой конфиг:
StreamsConfig.PROCESSING_GUARANTEE_CONFIG=StreamsConfig.EXACTLY_ONCE_BETA
StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG=Serdes.String().getClass().getName()
StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG=Serdes.ByteArray().getClass().getName()
StreamsConfig.CONSUMER_PREFIX + ConsumerConfig.AUTO_OFFSET_RESET_CONFIG="latest"
StreamsConfig.REPLICATION_FACTOR_CONFIG=3
StreamsConfig.TOPIC_PREFIX + TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG=2
StreamsConfig.PRODUCER_PREFIX + ProducerConfig.ACKS_CONFIG="all"
StreamsConfig.NUM_STREAM_THREADS_CONFIG=3
StreamsConfig.COMMIT_INTERVAL_MS_CONFIG=0
Енв локальный - три брокера, стрим простой, один сорс 100 партишенов, один синк 100 партишенов, два стейт стора и один процессор, который кладет в сторы хедер, и форвардит мессадж в синк. Проблема в том, что именно коммит мессаджа(в том числе стора), может занять до 200 мс(в среднем 80-100мс), даже при данном коммит интервале - 0. Вопрос, почему так долго и отчего это может зависеть? Спасибо.
А что в топологии делаешь? Есть код?
привет, примерно так Topology topology = new Topology() .addSource("in", Serdes.String().deserializer(), Serdes.ByteArray().deserializer(), "in-topic") .addProcessor("p", () -> new AbstractProcessor<String, byte[]>() { @Override public void process(String key, byte[] value) { StateStore store1 = context().getStateStore("store1"); StateStore store2 = context().getStateStore("store2"); ((KeyValueStore) store1).put(key, UUID.randomUUID().toString()); ((KeyValueStore) store2).put(key, UUID.randomUUID().toString()); context().headers().add("someHeader", "someHeaderValue".getBytes(StandardCharsets.UTF_8)); context().forward(key, value); } }, "in") .addStateStore(Stores.keyValueStoreBuilder( Stores.persistentKeyValueStore("store1"), Serdes.String(), Serdes.String()), "p") .addStateStore(Stores.keyValueStoreBuilder( Stores.persistentKeyValueStore("store2"), Serdes.String(), Serdes.String()), "p") .addSink("out", "out-topic", Serdes.String().serializer(), Serdes.ByteArray().serializer(), "p"); проект чистый, никаких аспектов и другой магии(помимо айдийки), которая могла бы влиять на результаты. Спасибо.
Нет идей, куда можно посмотреть? С праздником всех, btw.
А метрики снять можешь?
а какие метрики нужны?) и какой тулой лучше снять? накидал простой проект https://github.com/krasaev/kafka-stream-test
Jvisualvm может imx метрики собрать
Обсуждают сегодня