используя таблицу в потсгрес. Данные в таблицу пишутся и читаются последовательно, после вычитки и асинхронной обработки данных планируется удаление строки из таблицы. Что производительнее, селектить батчами или использовать курсор?
Хочется организовать out of the box для реализации транзакционного KafkaProducer с гарантией at least once.
Зачем?
Есть процесс: вычитка из топика, запись в базу данных, запись в топик. Это распределенная транзакция, которая заключается в трёх этапах запись в базу, запись в топик, коммит офсета. Сейчас реализовано так, что транзакция висит пока ждет ответ от кафка сервера, что может быстро привести к потере коннектов к бд, если кафка сервер подвиснет. У меня нет идеи, как реализовать асинхронную отправку в кафку без риска потери данных.
Для начала, это не распределенная транзакция, это самая обычная транзакция, просто внутри нее происходит взаимодействие с внешним сервисом, что действительно не очень хорошо. Если боитесь, что Кафка ляжет, делайте в три этапа запись в базу со статусом новое сообщение, коммит, сбрасываете в Кафку, коммит в Кафке, далее update записи в БД, плюс сообщения должны накатываться в целевом сервисе идемпотентно, т.е. если очередность сброса сообщений в Кафку нарушена, это не должно ничего сломать на другом конце (вообще так лучше делать всегда, поскольку при многопоточке или скейлинге сервиса очередность сброса в Кафку может быть нарушена очень легко). Ну и воркер придется завести для периодической проверки и повторной попытки отправки отвалившихся сообщений. PS: Кафку можно развернуть в довольно отказоустойчивых вариантах и возможно все эти пляски с бубном несколько избыточны.
1) Подумать о том, что очереди — это сложная вещь, с РСУБД не очень пересекающаяся, потому, возможно, лучшэ взять для этого спец.инструмент. Кролика там или кафку. 2) Если это как-то сложно, или важно очереди скрещивать именно с транзакцыями СУБД — возможно, взять ужэ имеющийся pgq.
Вставить таймаут, пусть роллбэк делает если кафка подохла.
Почему это нераспределенная транзакция? Если в ней задействованы несколько систем? Я обычно тоже делаю через стейт машину. Но идея описанного мной решения в том, что при взаимодействии с унаследованным кодом, делать миграции и расширять логику не очень хочется, гораздо проще написать персистентный kafkaProducer и заинжектить его вместо стандартного. Предоставив коробочное решение. Плюс такого подхода, что не нужно ожидать от кафки в синхронном режиме ответ, а можно вычитывая таблицу отправлять в асинхронном режиме удаляя строки из таблицы при успешной отправке. Производительность сильно выше, чем ожидать ответ от кафки блокируясь, потому как продюсер тоже отправляет батчами при накоплении сообщений. На счёт "если боитесь, что Кафка ляжет" по мне так система, в которой важно, чтобы данные не пропадали должна быть настолько хорошо написана, что на любой строчке кода, при креше jvm данные не должны теряться.
Если хотите реализовывать отправку руками, то ответьте себе на несколько вопросов, у вас точно очередь (если очередь, то нельзя иметь несколько сервисов отправки, т.е. все жестко последовательно и в один поток и пока идет отправка, вы не сможете добавить элементы, транзакции на добавление будут висеть и ждать) и сможет ли ваш инструмент стримить курсор?
В вашей транзакции два участника имеющие транзакционность, без, как минимум одного дополнительного, это не распределенная транзакция, например в кафку закоммитилось, а у вас подтверждение помещения в кафку отвалилось и транзакция в постгресе откатилась, получили несогласованность, сообщение в кафке лежит, а в БД его нет.
самое идеальное для меня, это взять готовое решение, которое закрывает мои потребности. Но я, к сожалению не знаю подходящее. Если в таблицу добавить id producer'a, то из неё можно читать и писать несколькими продюсерами одновременно. Как и если расширение pgq+pgq-coop использовать. Спасибо, за подсказку, не знал о таком. не вижу проблем взять jdbc и через него реализовать стрим.
Смысл очереди в последовательности - FIFO, но как я понял вам не нужна очередь, а нужна просто гарантия отправки всего что упало в таблицу, если отправлять параллельно несколькими потоками или отдельными экземплярами сервиса, то еще может сильно skip locked пригодиться
нужна в рамках отельных продюсеров. В рамках всей таблицы не нужна. На инстанс приложения один продюсер со своим id, который используется для записи и вычитки сообщений.
вообще их три. Консюмер коммитящий оффсет, база данных, продюсер отправляющий в кафку и приложение, которое менеджит происходящее. Но понятно, что реализовать согласованную распределенную транзакцию это очень дорого для производительности, поэтому есть идемпотентный ключ и идея at least once при обработке, с сохраняющимся порядком обработки при удачном выполнении.
Странная архитектура честно говоря, но вам виднее.
У вас открытая транзакция ждет вычитки и сохранения сообщения другим сервисом?
ждет, что сообщение записалось в кафку с её гарантиями персистентности.
а есть аргументы? довольно часто встречается паттерн event sourcing. Просто его реализация может хромать, и чтобы добавить гарантий в систему, хочется сделать описанное решение. Можете предоставить себе систему, когда изменение состояния объекта проходит последовательно через несколько сервисов. Обычно в таких случаях bpmn используют, как координатор. но как это относится к исходному вопросу, я так и не понял🙃
селектить батчами или курсор?
Кафка используется для асинхронного взаимодействия сервисов, а вы хотите еще и асинхронного взаимодействия с самой кафкой, но делать ничего для этого не хотите, так чем мы можем вам помочь?) Может и есть решение из коробки, но я о таком не в курсе.
Вопрос не совсем понятен, курсор тоже может порциями вычитывать.
Можно посмотреть на temporal io - он как раз про ивент сорсинг
Почитайте про паттерн outbox
И в догонку про https://debezium.io/. Как раз то что вы описали
Для чего используется кафка понятно. Другой момент, как она используется без угрозы потерять сообщения. мне известно два варианта: 1. Использовать идемпотентноть и не коммитить оффсеты, пока не записали в потребитель, но тогда придётся блокироваться и ждать возвращаемый результат. 2. Писать в бд синхронно, коммитить оффсет и брать на себя ответственность за отправку сообщения меняя статус в бд. 3. Хотелось бы готовое решение для реализации второго варианта, о котором вы выше писали и которое я обычно использую. Собственно для этого и хочу использовать отдельную таблицу в постгресе для transaction out of box. Но есть сомнения, мб и первый вариант использовать.
так я изначально в первом сообщении про это и написал.
да тоже смотрел на него, сейчас нашёл, что есть embedded вариант, его возможно и использую
А что за он?
https://debezium.io/ про который вы выше писали
Обсуждают сегодня