170 похожих чатов

Подскажите плиз, есть ли какой либо механизм которые решал бы

следующую проблему:

есть обработчик кафкаТопик из которого приходят сигналы на то или иное действие которые необходимо сделать с объектом, т.е. взять его из кеша, обработать и обратно сохранить его в кеш. Все это выглядит следующем образом:

public class KafkaHandler : IMessageHandler<KafkaObj>
{
private readonly IMyService _service;

public Task Handle(IMessageContext context, KafkaObj data) => _service.Set(data);
}

public class MyService : IMyService
{
private readonly IMyCacheService _myCache;
private readonly IObjService _service;

public async Task Set(KafkaObj kafkaObj)
{
var cacheObj = await _myCache.Get(kafkaObj.Id);

await _service.Processing(cacheObj, kafkaObj.Command);
await _myCache.Save(cacheObj);
}
}

каждая "Command" меняет состояние у объекта cacheObj, и эти команды имеют приоритет. Поэтому появилась следующая проблема, если из кафки приходит две команды почти одновременно для одного и того же kafkaObj.Id, то в разных потоках из кэша берется один и тот же объект с нулевым состоянием, затем применяются обе команды и при сохранении в кеш перетирает друг друга, получается гонка потоков вообщем. Нужно сделать так, что если объект уже получен из кеша, то приостановить следующий запрос получения объекта из кеша, пока первый поток обратно не сохранит в кеш. А еще не маловажный момент, что если Id разное, то не нужно приостанавливать этот процесс
Как эту проблему можно решить?

IMyCacheService - это обертка над StackExchange.Redis.Extensions.Core.Abstractions.IRedisClient.

18 ответов

23 просмотра

так может читать из очереди ПО ОЧЕРЕДИ?

Timur- Автор вопроса

по очереди долго, нужно в нескольких потоках

что такое долго в твоём понимании. Почему долго. Какие инварианты ты хочешь сохранить на доставку/очередность

Timur
по очереди долго, нужно в нескольких потоках

а может по порядку для каждого определенного объекта?) начинает звучать как акторная модель)

Timur- Автор вопроса
Ayrat Hudaygulov
что такое долго в твоём понимании. Почему долго. К...

если разные Id , то обрабатывать их в разных потоках

Timur
если разные Id , то обрабатывать их в разных поток...

значит тебе надо партиционировать по ID

Timur- Автор вопроса
Ayrat Hudaygulov
значит тебе надо партиционировать по ID

но есть еще одно НО, команды должны приходить параллельно на объект , а уже программа решает в какой последовательности изменять состояние у объекта

Timur
но есть еще одно НО, команды должны приходить пара...

А как это выглядит. Вот приходит мне одна команда из топика на id1, мне надо её применять сразу или подождать, может другая придёт?

Timur- Автор вопроса
Ayrat Hudaygulov
А как это выглядит. Вот приходит мне одна команда ...

начать выполнять действие, и если придет новая команда с приоритетом выше, приостановить, выполнить новую и продолжить приостановленную

Timur
начать выполнять действие, и если придет новая ком...

Тогда не понимаю зачем их читать параллельно в пределах одного id

Timur- Автор вопроса
Ayrat Hudaygulov
Тогда не понимаю зачем их читать параллельно в пре...

сейчас читаю все подряд без партицирования

Timur- Автор вопроса
Ayrat Hudaygulov
Тогда не понимаю зачем их читать параллельно в пре...

поэтому из кеша берутся объекты с одинаковым состоянием и получается гонка перетирает состояния

Понимаю. Звучит как простая задача для любой стриминг либы - читать из топика - группировать по саб стримам по id - делать какую-то сложную агрегацию по приоритету Но у тебя их нет, подозреваю

Timur
агрегации? нетт

В данном случае применение команд по приоритету и есть агрегация. Но мы отошли от вопроса! Думается мне тебе надо партиции переделать. Если не получится, то придется трахаться с ручной синхронизацией по id

Timur- Автор вопроса

Похожие вопросы

Обсуждают сегодня

Господа, а что сейчас вообще с рынком труда на делфи происходит? Какова ситуация?
Rꙮman Yankꙮvsky
29
А вообще, что может смущать в самой Julia - бы сказал, что нет единого стандартного подхода по многим моментам, поэтому многое выглядит как "хаки" и произвол. Короче говоря, с...
Viktor G.
2
30500 за редактор? )
Владимир
47
а через ESC-код ?
Alexey Kulakov
29
Чёт не понял, я ж правильной функцией воспользовался чтобы вывести отладочную информацию? но что-то она не ловится
notme
18
У меня есть функция где происходит это: write_bit(buffer, 1); write_bit(buffer, 0); write_bit(buffer, 1); write_bit(buffer, 1); write_bit(buffer, 1); w...
~
14
Добрый день! Скажите пожалуйста, а какие программы вы бы рекомендовали написать для того, чтобы научиться управлять памятью? Можно написать динамический массив, можно связный ...
Филипп
7
Недавно Google Project Zero нашёл багу в SQLite с помощью LLM, о чём достаточно было шумно в определённых интернетах, которые сопровождались рассказами, что скоро всех "ибешни...
Alex Sherbakov
5
Ребят в СИ можно реализовать ООП?
Николай
33
https://github.com/erlang/otp/blob/OTP-27.1/lib/kernel/src/logger_h_common.erl#L174 https://github.com/erlang/otp/blob/OTP-27.1/lib/kernel/src/logger_olp.erl#L76 15 лет назад...
Maksim Lapshin
20
Карта сайта