Привет ребят, есть такая проблемка, если это проблемка вообще) Есть java,

есть кафка стрим 2.6, примерно такой конфиг:
StreamsConfig.PROCESSING_GUARANTEE_CONFIG=StreamsConfig.EXACTLY_ONCE_BETA
StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG=Serdes.String().getClass().getName()
StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG=Serdes.ByteArray().getClass().getName()
StreamsConfig.CONSUMER_PREFIX + ConsumerConfig.AUTO_OFFSET_RESET_CONFIG="latest"
StreamsConfig.REPLICATION_FACTOR_CONFIG=3
StreamsConfig.TOPIC_PREFIX + TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG=2
StreamsConfig.PRODUCER_PREFIX + ProducerConfig.ACKS_CONFIG="all"
StreamsConfig.NUM_STREAM_THREADS_CONFIG=3
StreamsConfig.COMMIT_INTERVAL_MS_CONFIG=0
Енв локальный - три брокера, стрим простой, один сорс 100 партишенов, один синк 100 партишенов, два стейт стора и один процессор, который кладет в сторы хедер, и форвардит мессадж в синк. Проблема в том, что именно коммит мессаджа(в том числе стора), может занять до 200 мс(в среднем 80-100мс), даже при данном коммит интервале - 0. Вопрос, почему так долго и отчего это может зависеть? Спасибо.

6 ответов

17 просмотров

А что в топологии делаешь? Есть код?

Dmitry-Krasaev Автор вопроса
Vik Gamov
А что в топологии делаешь? Есть код?

привет, примерно так Topology topology = new Topology() .addSource("in", Serdes.String().deserializer(), Serdes.ByteArray().deserializer(), "in-topic") .addProcessor("p", () -> new AbstractProcessor<String, byte[]>() { @Override public void process(String key, byte[] value) { StateStore store1 = context().getStateStore("store1"); StateStore store2 = context().getStateStore("store2"); ((KeyValueStore) store1).put(key, UUID.randomUUID().toString()); ((KeyValueStore) store2).put(key, UUID.randomUUID().toString()); context().headers().add("someHeader", "someHeaderValue".getBytes(StandardCharsets.UTF_8)); context().forward(key, value); } }, "in") .addStateStore(Stores.keyValueStoreBuilder( Stores.persistentKeyValueStore("store1"), Serdes.String(), Serdes.String()), "p") .addStateStore(Stores.keyValueStoreBuilder( Stores.persistentKeyValueStore("store2"), Serdes.String(), Serdes.String()), "p") .addSink("out", "out-topic", Serdes.String().serializer(), Serdes.ByteArray().serializer(), "p"); проект чистый, никаких аспектов и другой магии(помимо айдийки), которая могла бы влиять на результаты. Спасибо.

Dmitry-Krasaev Автор вопроса
Vik Gamov
А что в топологии делаешь? Есть код?

Нет идей, куда можно посмотреть? С праздником всех, btw.

Dmitry-Krasaev Автор вопроса
Vik Gamov
А метрики снять можешь?

а какие метрики нужны?) и какой тулой лучше снять? накидал простой проект https://github.com/krasaev/kafka-stream-test

Похожие вопросы

Обсуждают сегодня

Ребята, всем привет. Подскажите, пожалуйста, можно ли как-то через бота понять, что этого бота добавили в группу\канал и выдали ему права администратора?
Artem Stormageddon
9
Это переведённый текст с английского. Я не говорю на русском, но могу использовать переводчик Телеграм. Приветствую! Я начинающий веб-разработчик и все еще учусь. В настояще...
𐩱𐩪𐩣𐩱𐩲𐩺𐩡
3
А не хотим ли мы развлечься? 😉 Но так чтобы с пользой для наших профессиональных навыков?? 👨‍🎓👩‍🎓 Предлагаю на октябрь запланировать тестовый запуск новой командной игры "Игр...
Andrii Kurdiumov
2
Привет всем! Почему этот код не срабатывает при добавлении или удалении пользователя из чата? bot.on('chat_member', async (ctx) => { console.log(ctx); }) bot.launch({allo...
Alexander
5
у кого сколько оперативы на базе данных ?
АДИЛЬБЕК
4
Через бот апи возможно получить ID стикерпака? Не ссылку.
Vexylon [АФК до 09.09]
5
Привет Хочу сделать аналог iCloud’а для своих проектов, чтобы пользовательская информация хранилась в облаке, была доступна во всех сервисах, её можно было подсасывать везде)...
Виталий
9
В тг можно спарсить всех кто пишет в группе? Если список участников скрыт
S
3
код Event::listen('cms.page.display', function (&$content, $slug, $page, $html) { if (is_object($content)) { dump($content); } else { dump($s...
Point 111
3
Всем привет. Не понимаю, в чём тут шутка юмора. Убирается только разрешение на send_messages. А send_media_messages остаётся. Как сделать, чтобы оба убирались? await b...
Alexander
2
Карта сайта