170 похожих чатов

Подскажите плиз, есть ли какой либо механизм которые решал бы

следующую проблему:

есть обработчик кафкаТопик из которого приходят сигналы на то или иное действие которые необходимо сделать с объектом, т.е. взять его из кеша, обработать и обратно сохранить его в кеш. Все это выглядит следующем образом:

public class KafkaHandler : IMessageHandler<KafkaObj>
{
private readonly IMyService _service;

public Task Handle(IMessageContext context, KafkaObj data) => _service.Set(data);
}

public class MyService : IMyService
{
private readonly IMyCacheService _myCache;
private readonly IObjService _service;

public async Task Set(KafkaObj kafkaObj)
{
var cacheObj = await _myCache.Get(kafkaObj.Id);

await _service.Processing(cacheObj, kafkaObj.Command);
await _myCache.Save(cacheObj);
}
}

каждая "Command" меняет состояние у объекта cacheObj, и эти команды имеют приоритет. Поэтому появилась следующая проблема, если из кафки приходит две команды почти одновременно для одного и того же kafkaObj.Id, то в разных потоках из кэша берется один и тот же объект с нулевым состоянием, затем применяются обе команды и при сохранении в кеш перетирает друг друга, получается гонка потоков вообщем. Нужно сделать так, что если объект уже получен из кеша, то приостановить следующий запрос получения объекта из кеша, пока первый поток обратно не сохранит в кеш. А еще не маловажный момент, что если Id разное, то не нужно приостанавливать этот процесс
Как эту проблему можно решить?

IMyCacheService - это обертка над StackExchange.Redis.Extensions.Core.Abstractions.IRedisClient.

18 ответов

19 просмотров

так может читать из очереди ПО ОЧЕРЕДИ?

Timur- Автор вопроса

по очереди долго, нужно в нескольких потоках

что такое долго в твоём понимании. Почему долго. Какие инварианты ты хочешь сохранить на доставку/очередность

Timur
по очереди долго, нужно в нескольких потоках

а может по порядку для каждого определенного объекта?) начинает звучать как акторная модель)

Timur- Автор вопроса
Ayrat Hudaygulov
что такое долго в твоём понимании. Почему долго. К...

если разные Id , то обрабатывать их в разных потоках

Timur
если разные Id , то обрабатывать их в разных поток...

значит тебе надо партиционировать по ID

Timur- Автор вопроса
Ayrat Hudaygulov
значит тебе надо партиционировать по ID

но есть еще одно НО, команды должны приходить параллельно на объект , а уже программа решает в какой последовательности изменять состояние у объекта

Timur
но есть еще одно НО, команды должны приходить пара...

А как это выглядит. Вот приходит мне одна команда из топика на id1, мне надо её применять сразу или подождать, может другая придёт?

Timur- Автор вопроса
Ayrat Hudaygulov
А как это выглядит. Вот приходит мне одна команда ...

начать выполнять действие, и если придет новая команда с приоритетом выше, приостановить, выполнить новую и продолжить приостановленную

Timur
начать выполнять действие, и если придет новая ком...

Тогда не понимаю зачем их читать параллельно в пределах одного id

Timur- Автор вопроса
Ayrat Hudaygulov
Тогда не понимаю зачем их читать параллельно в пре...

сейчас читаю все подряд без партицирования

Timur- Автор вопроса
Ayrat Hudaygulov
Тогда не понимаю зачем их читать параллельно в пре...

поэтому из кеша берутся объекты с одинаковым состоянием и получается гонка перетирает состояния

Понимаю. Звучит как простая задача для любой стриминг либы - читать из топика - группировать по саб стримам по id - делать какую-то сложную агрегацию по приоритету Но у тебя их нет, подозреваю

Timur
агрегации? нетт

В данном случае применение команд по приоритету и есть агрегация. Но мы отошли от вопроса! Думается мне тебе надо партиции переделать. Если не получится, то придется трахаться с ручной синхронизацией по id

Timur- Автор вопроса

Похожие вопросы

Обсуждают сегодня

Всем привет! Имеется функция: function IsValidChar(ch: UTF8Char): Boolean; var i: Integer; ValidChars: AnsiString; begin ValidChars := 'abcdefghijklmnopqrstuvwxyzABCDE...
Евгений
44
Чтобы перехватить все нажимания буков на форме, надо хук ставить? Пробовал на форме ОнКейДаун, оно ловит клаву если фокус не на компоненте с вводом текста
Serjone
15
лучше скажите, причём тут паскаль?
Alexey Kulakov
36
Всем привет! вывожу на общей стр дочерние ресурсыв каждом ресурсе галерея, и первая фотка должна выводиться на общей [!DocLister? &prepare=photo !]
Alekso
12
А можно вопрос? Мне сегодня сказали что у меня функция (которая просто заполняет массив значениями) не правильная void Full(double * arr, int n) { for (int i = 0; i < n; i...
† C E †
7
День добрый, подскажите пожалуйста, есть ли какой-то способ сказать ребару не компилировать определённое приложение? Всю доку их перечиатл ничего подобного не нашёл
Кирилл
14
Добрый вечер. Хочу чтобы у меня в классе поле было функцией, которая возвращает строку. Делаю так: interface ... TGetOutPath = function : String of object; ... protec...
Kirill Filippenok
12
Народ! Впервые клиенту пришло письмо от РКН, у вас, дескать, есть яндекс метрика, а нигде не написано, что вы ее юзаете. Никто не сталкивался?
Sasha Beep
10
Это может быть все-таки не флудвейт? у меня ботфазер принимает изменения и отображает даже что они изменились, на видео видно что он прислал якобы уже измененное описание, н...
OVERLINK
13
Здравствуйте, хочу сделать HelloWorld в консоли Дельфи, но функция API ничего не выводит, что я делаю не так? program Hello; {$APPTYPE CONSOLE} uses System.SysUtils, WinAPI.Wi...
Sergey Vinogradov
20
Карта сайта