следующую проблему:
есть обработчик кафкаТопик из которого приходят сигналы на то или иное действие которые необходимо сделать с объектом, т.е. взять его из кеша, обработать и обратно сохранить его в кеш. Все это выглядит следующем образом:
public class KafkaHandler : IMessageHandler<KafkaObj>
{
private readonly IMyService _service;
public Task Handle(IMessageContext context, KafkaObj data) => _service.Set(data);
}
public class MyService : IMyService
{
private readonly IMyCacheService _myCache;
private readonly IObjService _service;
public async Task Set(KafkaObj kafkaObj)
{
var cacheObj = await _myCache.Get(kafkaObj.Id);
await _service.Processing(cacheObj, kafkaObj.Command);
await _myCache.Save(cacheObj);
}
}
каждая "Command" меняет состояние у объекта cacheObj, и эти команды имеют приоритет. Поэтому появилась следующая проблема, если из кафки приходит две команды почти одновременно для одного и того же kafkaObj.Id, то в разных потоках из кэша берется один и тот же объект с нулевым состоянием, затем применяются обе команды и при сохранении в кеш перетирает друг друга, получается гонка потоков вообщем. Нужно сделать так, что если объект уже получен из кеша, то приостановить следующий запрос получения объекта из кеша, пока первый поток обратно не сохранит в кеш. А еще не маловажный момент, что если Id разное, то не нужно приостанавливать этот процесс
Как эту проблему можно решить?
IMyCacheService - это обертка над StackExchange.Redis.Extensions.Core.Abstractions.IRedisClient.
так может читать из очереди ПО ОЧЕРЕДИ?
по очереди долго, нужно в нескольких потоках
что такое долго в твоём понимании. Почему долго. Какие инварианты ты хочешь сохранить на доставку/очередность
а может по порядку для каждого определенного объекта?) начинает звучать как акторная модель)
если разные Id , то обрабатывать их в разных потоках
значит тебе надо партиционировать по ID
но есть еще одно НО, команды должны приходить параллельно на объект , а уже программа решает в какой последовательности изменять состояние у объекта
хера у вас дизайн!
А как это выглядит. Вот приходит мне одна команда из топика на id1, мне надо её применять сразу или подождать, может другая придёт?
начать выполнять действие, и если придет новая команда с приоритетом выше, приостановить, выполнить новую и продолжить приостановленную
Тогда не понимаю зачем их читать параллельно в пределах одного id
сейчас читаю все подряд без партицирования
поэтому из кеша берутся объекты с одинаковым состоянием и получается гонка перетирает состояния
Понимаю. Звучит как простая задача для любой стриминг либы - читать из топика - группировать по саб стримам по id - делать какую-то сложную агрегацию по приоритету Но у тебя их нет, подозреваю
В данном случае применение команд по приоритету и есть агрегация. Но мы отошли от вопроса! Думается мне тебе надо партиции переделать. Если не получится, то придется трахаться с ручной синхронизацией по id
но ты задачу понял на 100)
дохера партиций получается >100к
Обсуждают сегодня