172 похожих чатов

Всем привет! Можете подсказать, что лучше, если задача реализовать очередь

используя таблицу в потсгрес. Данные в таблицу пишутся и читаются последовательно, после вычитки и асинхронной обработки данных планируется удаление строки из таблицы. Что производительнее, селектить батчами или использовать курсор?

Хочется организовать out of the box для реализации транзакционного KafkaProducer с гарантией at least once.

27 ответов

32 просмотра

Зачем?

Pavel-Lygin Автор вопроса
central hardware
Зачем?

Есть процесс: вычитка из топика, запись в базу данных, запись в топик. Это распределенная транзакция, которая заключается в трёх этапах запись в базу, запись в топик, коммит офсета. Сейчас реализовано так, что транзакция висит пока ждет ответ от кафка сервера, что может быстро привести к потере коннектов к бд, если кафка сервер подвиснет. У меня нет идеи, как реализовать асинхронную отправку в кафку без риска потери данных.

Pavel Lygin
Есть процесс: вычитка из топика, запись в базу дан...

Для начала, это не распределенная транзакция, это самая обычная транзакция, просто внутри нее происходит взаимодействие с внешним сервисом, что действительно не очень хорошо. Если боитесь, что Кафка ляжет, делайте в три этапа запись в базу со статусом новое сообщение, коммит, сбрасываете в Кафку, коммит в Кафке, далее update записи в БД, плюс сообщения должны накатываться в целевом сервисе идемпотентно, т.е. если очередность сброса сообщений в Кафку нарушена, это не должно ничего сломать на другом конце (вообще так лучше делать всегда, поскольку при многопоточке или скейлинге сервиса очередность сброса в Кафку может быть нарушена очень легко). Ну и воркер придется завести для периодической проверки и повторной попытки отправки отвалившихся сообщений. PS: Кафку можно развернуть в довольно отказоустойчивых вариантах и возможно все эти пляски с бубном несколько избыточны.

1) Подумать о том, что очереди — это сложная вещь, с РСУБД не очень пересекающаяся, потому, возможно, лучшэ взять для этого спец.инструмент. Кролика там или кафку. 2) Если это как-то сложно, или важно очереди скрещивать именно с транзакцыями СУБД — возможно, взять ужэ имеющийся pgq.

Pavel Lygin
Есть процесс: вычитка из топика, запись в базу дан...

Вставить таймаут, пусть роллбэк делает если кафка подохла.

Pavel-Lygin Автор вопроса
Denis Novysh
Для начала, это не распределенная транзакция, это ...

Почему это нераспределенная транзакция? Если в ней задействованы несколько систем? Я обычно тоже делаю через стейт машину. Но идея описанного мной решения в том, что при взаимодействии с унаследованным кодом, делать миграции и расширять логику не очень хочется, гораздо проще написать персистентный kafkaProducer и заинжектить его вместо стандартного. Предоставив коробочное решение. Плюс такого подхода, что не нужно ожидать от кафки в синхронном режиме ответ, а можно вычитывая таблицу отправлять в асинхронном режиме удаляя строки из таблицы при успешной отправке. Производительность сильно выше, чем ожидать ответ от кафки блокируясь, потому как продюсер тоже отправляет батчами при накоплении сообщений. На счёт "если боитесь, что Кафка ляжет" по мне так система, в которой важно, чтобы данные не пропадали должна быть настолько хорошо написана, что на любой строчке кода, при креше jvm данные не должны теряться.

Если хотите реализовывать отправку руками, то ответьте себе на несколько вопросов, у вас точно очередь (если очередь, то нельзя иметь несколько сервисов отправки, т.е. все жестко последовательно и в один поток и пока идет отправка, вы не сможете добавить элементы, транзакции на добавление будут висеть и ждать) и сможет ли ваш инструмент стримить курсор?

Pavel Lygin
Почему это нераспределенная транзакция? Если в ней...

В вашей транзакции два участника имеющие транзакционность, без, как минимум одного дополнительного, это не распределенная транзакция, например в кафку закоммитилось, а у вас подтверждение помещения в кафку отвалилось и транзакция в постгресе откатилась, получили несогласованность, сообщение в кафке лежит, а в БД его нет.

Pavel-Lygin Автор вопроса
Denis Novysh
Если хотите реализовывать отправку руками, то отве...

самое идеальное для меня, это взять готовое решение, которое закрывает мои потребности. Но я, к сожалению не знаю подходящее. Если в таблицу добавить id producer'a, то из неё можно читать и писать несколькими продюсерами одновременно. Как и если расширение pgq+pgq-coop использовать. Спасибо, за подсказку, не знал о таком. не вижу проблем взять jdbc и через него реализовать стрим.

Pavel Lygin
самое идеальное для меня, это взять готовое решени...

Смысл очереди в последовательности - FIFO, но как я понял вам не нужна очередь, а нужна просто гарантия отправки всего что упало в таблицу, если отправлять параллельно несколькими потоками или отдельными экземплярами сервиса, то еще может сильно skip locked пригодиться

Pavel-Lygin Автор вопроса
Denis Novysh
Смысл очереди в последовательности - FIFO, но как ...

нужна в рамках отельных продюсеров. В рамках всей таблицы не нужна. На инстанс приложения один продюсер со своим id, который используется для записи и вычитки сообщений.

Pavel-Lygin Автор вопроса
Denis Novysh
В вашей транзакции два участника имеющие транзакци...

вообще их три. Консюмер коммитящий оффсет, база данных, продюсер отправляющий в кафку и приложение, которое менеджит происходящее. Но понятно, что реализовать согласованную распределенную транзакцию это очень дорого для производительности, поэтому есть идемпотентный ключ и идея at least once при обработке, с сохраняющимся порядком обработки при удачном выполнении.

Pavel Lygin
нужна в рамках отельных продюсеров. В рамках всей ...

Странная архитектура честно говоря, но вам виднее.

Pavel Lygin
вообще их три. Консюмер коммитящий оффсет, база да...

У вас открытая транзакция ждет вычитки и сохранения сообщения другим сервисом?

Pavel-Lygin Автор вопроса
Denis Novysh
У вас открытая транзакция ждет вычитки и сохранени...

ждет, что сообщение записалось в кафку с её гарантиями персистентности.

Pavel-Lygin Автор вопроса

а есть аргументы? довольно часто встречается паттерн event sourcing. Просто его реализация может хромать, и чтобы добавить гарантий в систему, хочется сделать описанное решение. Можете предоставить себе систему, когда изменение состояния объекта проходит последовательно через несколько сервисов. Обычно в таких случаях bpmn используют, как координатор. но как это относится к исходному вопросу, я так и не понял🙃

Pavel-Lygin Автор вопроса

селектить батчами или курсор?

Pavel Lygin
а есть аргументы? довольно часто встречается патте...

Кафка используется для асинхронного взаимодействия сервисов, а вы хотите еще и асинхронного взаимодействия с самой кафкой, но делать ничего для этого не хотите, так чем мы можем вам помочь?) Может и есть решение из коробки, но я о таком не в курсе.

Pavel Lygin
селектить батчами или курсор?

Вопрос не совсем понятен, курсор тоже может порциями вычитывать.

Pavel Lygin
а есть аргументы? довольно часто встречается патте...

Можно посмотреть на temporal io - он как раз про ивент сорсинг

И в догонку про https://debezium.io/. Как раз то что вы описали

Pavel-Lygin Автор вопроса
Denis Novysh
Кафка используется для асинхронного взаимодействия...

Для чего используется кафка понятно. Другой момент, как она используется без угрозы потерять сообщения. мне известно два варианта: 1. Использовать идемпотентноть и не коммитить оффсеты, пока не записали в потребитель, но тогда придётся блокироваться и ждать возвращаемый результат. 2. Писать в бд синхронно, коммитить оффсет и брать на себя ответственность за отправку сообщения меняя статус в бд. 3. Хотелось бы готовое решение для реализации второго варианта, о котором вы выше писали и которое я обычно использую. Собственно для этого и хочу использовать отдельную таблицу в постгресе для transaction out of box. Но есть сомнения, мб и первый вариант использовать.

Pavel-Lygin Автор вопроса
Nikolay Underground
Почитайте про паттерн outbox

так я изначально в первом сообщении про это и написал.

Pavel-Lygin Автор вопроса
Constantine
И в догонку про https://debezium.io/. Как раз то ч...

да тоже смотрел на него, сейчас нашёл, что есть embedded вариант, его возможно и использую

А что за он?

Pavel-Lygin Автор вопроса
Constantine
А что за он?

https://debezium.io/ про который вы выше писали

Похожие вопросы

Обсуждают сегодня

а зачем этот вопрос для удаления из чата?
Mёdkinson Medvezhkin
63
Добрый день. Хочу сделать отрисовку по команде на панели. Почему-то рисуется только при втором вызове. С чем может быть связано, не подскажете? procedure TForm1.FormDblClick(...
Kirill Filippenok
20
Всем доброго дня! Подскажите может кто использовал связку Pagebuilder + Clientsetting. Сами параметры с типом pagebuilder в модуле Clientsetting работают нормально, можно такж...
Александр Добриков
12
А почему в си некоторые вещи работают с двойными кавычками некоторые с одинарными? Нельзя было все сделать с одними или чтоб работало с разными? например чтоб выводить строки ...
.
15
Всем привет! Нужен совет от опытных. Переношу свой проект с Делфи 10.2 Токио на Лазарус 3.2 установленный через инсталлятор fpcupdeluxe-x86_64-win64. При импортировании проект...
Дмитрий Завгородний
7
Эх кто-то пришел и весь праздник испортил :( You need complex FBX scene importing setup to change things on import? good luck with that. You need navigation and pathfinding? g...
Serg Gini
5
Всем привет! Подскажите. Я написал приложение на Delphi 10.2 Tokyo под Windows 10. И передо мной стал вопрос о том чтобы сделать это приложение кроссплатформенным (под Linux и...
Дмитрий Завгородний
24
Всем привет! procedure TForm1.FormCreate(Sender: TObject); type TStartEnd = record S: Byte; E: Byte; end; var a, b: TStartEnd; begin {1} a.S := 1; {2} a.E := 2; ...
Руслан Михайлович
10
Всем привет!) я тут новенький и пытаюсь освоить evolution методом тыка. У меня при переходе между папками файлов выскакивают вот такие уведомления Можете подсказать как их от...
Диман Samoed
10
Какого хера? /Sources/App/Modules/User/Models/UserLinkApple.swift:21:20: warning: stored property '_id' of 'Sendable'-conforming class 'UserLinkApple' is mutable @ID(...
Alexander Sherbakov
14
Карта сайта