Привет ребят, есть такая проблемка, если это проблемка вообще) Есть java,

есть кафка стрим 2.6, примерно такой конфиг:
StreamsConfig.PROCESSING_GUARANTEE_CONFIG=StreamsConfig.EXACTLY_ONCE_BETA
StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG=Serdes.String().getClass().getName()
StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG=Serdes.ByteArray().getClass().getName()
StreamsConfig.CONSUMER_PREFIX + ConsumerConfig.AUTO_OFFSET_RESET_CONFIG="latest"
StreamsConfig.REPLICATION_FACTOR_CONFIG=3
StreamsConfig.TOPIC_PREFIX + TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG=2
StreamsConfig.PRODUCER_PREFIX + ProducerConfig.ACKS_CONFIG="all"
StreamsConfig.NUM_STREAM_THREADS_CONFIG=3
StreamsConfig.COMMIT_INTERVAL_MS_CONFIG=0
Енв локальный - три брокера, стрим простой, один сорс 100 партишенов, один синк 100 партишенов, два стейт стора и один процессор, который кладет в сторы хедер, и форвардит мессадж в синк. Проблема в том, что именно коммит мессаджа(в том числе стора), может занять до 200 мс(в среднем 80-100мс), даже при данном коммит интервале - 0. Вопрос, почему так долго и отчего это может зависеть? Спасибо.

6 ответов

8 просмотров

А что в топологии делаешь? Есть код?

Dmitry-Krasaev Автор вопроса
Vik Gamov
А что в топологии делаешь? Есть код?

привет, примерно так Topology topology = new Topology() .addSource("in", Serdes.String().deserializer(), Serdes.ByteArray().deserializer(), "in-topic") .addProcessor("p", () -> new AbstractProcessor<String, byte[]>() { @Override public void process(String key, byte[] value) { StateStore store1 = context().getStateStore("store1"); StateStore store2 = context().getStateStore("store2"); ((KeyValueStore) store1).put(key, UUID.randomUUID().toString()); ((KeyValueStore) store2).put(key, UUID.randomUUID().toString()); context().headers().add("someHeader", "someHeaderValue".getBytes(StandardCharsets.UTF_8)); context().forward(key, value); } }, "in") .addStateStore(Stores.keyValueStoreBuilder( Stores.persistentKeyValueStore("store1"), Serdes.String(), Serdes.String()), "p") .addStateStore(Stores.keyValueStoreBuilder( Stores.persistentKeyValueStore("store2"), Serdes.String(), Serdes.String()), "p") .addSink("out", "out-topic", Serdes.String().serializer(), Serdes.ByteArray().serializer(), "p"); проект чистый, никаких аспектов и другой магии(помимо айдийки), которая могла бы влиять на результаты. Спасибо.

Dmitry-Krasaev Автор вопроса
Vik Gamov
А что в топологии делаешь? Есть код?

Нет идей, куда можно посмотреть? С праздником всех, btw.

Dmitry-Krasaev Автор вопроса
Vik Gamov
А метрики снять можешь?

а какие метрики нужны?) и какой тулой лучше снять? накидал простой проект https://github.com/krasaev/kafka-stream-test

Похожие вопросы

Обсуждают сегодня

Всем привет. Подскажите, почему не меняется значение поля при переключении сайта?
Alexander Peterikov
11
Можно ли загрузить скрипт py в бота чтобы он работал по нему? как это сделать?
huskadam #RCC Фанат? @hitlerpvp
13
'frakturBold' => ['𝖆', '𝖇', '𝖈', '𝖉', '𝖊', '𝖋', '𝖌', '𝖍', '𝖎', '𝖏', '𝖐', '𝖑', '𝖒', '𝖓', '𝖔', '𝖕', '𝖖', '𝖗', '𝖘', '𝖙', '𝖚', '𝖛', '𝖜', '𝖝', '𝖞', '𝖟', '𝕬', '𝕭', '𝕮', '𝕯'...
Roma
4
Приветствую друзья, подскажите сколько в среднем стоит на данный момент создать тг бота который будет как магазин? Показывать ассортименты доставлять заказы и тд? Все по станд...
Eugene
3
Добрый день, не подскажите, если в OC-V3 поменять страндартную директорию /storage/ на /storage2/ - не будет сильно много проблем ?
Max Dubovsky
32
А вот из практических задач на работе, кто работает расскажите относительно задач на работе, как вообще выживаете. Если есть желание, интересно тоже что и как сейчас с этим . ...
...
2
Ребят, а за скок можно впарить анон чат с апишкой и веб админкой ?
Eugene Неелов
15
Привет. На сайте с видео установлена защита, не позволяющая скачивать видео, делать скриншоты и скринкасты, но это работает только с пк и устройств эпл. С андроида работают ск...
Lencore
1
Добрый день! Кто-нибудь знает как подключить твиг в контроллеры плагина?
Николай Афанасенко
5
@dblackCat Привет. Это же твой плагин? https://octobercms.com/plugin/catdesign-productbundle
Alexey Yakimov
5
Карта сайта