Приветствую, коллеги! Такая ситуация, приходят сообщения из RabbitMQ в приложение

на Spring Boot и там асинхронно перекладываются в другую очередь, если сообщение успешно переложилось, то в раббит посылается ack, если за определённый таймаут не обработалось, то посылается nack и сообщение возвращается в раббит и тут же пересылается обратно в приложение.
Из-за того, что сообщения перекладываются асинхронно, то возникают дубликаты. Можно как-нибудь провести дедупликацию таких сообщений на уровне настроек раббита? Например транзитивность или что-то такое?

14 ответов

23 просмотра

А откуда рэббиту знать какое сообщение обработалось, а какое нет. Он свой nack получил - обязан доставить ещё раз

Dmitry- Автор вопроса
d.
А откуда рэббиту знать какое сообщение обработалос...

Это верно, видимо придётся уходить от асинхронности к монопоточности... Хотелось всё же убедиться что других вариантов нет

Dmitry
Это верно, видимо придётся уходить от асинхронност...

Не поможет. Можно анализировать redelivery флаг, это уже будет ближе к решению

Dmitry- Автор вопроса
d.
Не поможет. Можно анализировать redelivery флаг, э...

а как поможет redelivery флаг? если у нас сообщение в одном потоке обрабатывается (потеряно соединение и вызываются ретраи), приходит это же сообщение с redelivery флаг=true и отправляется в другой поток - как итог когда соединение восстанавливается - оба сообщения уходит в другую очередь

Dmitry
а как поможет redelivery флаг? если у нас сообщени...

Флаг поможет понять, что уже однажды сообщение пытались обработать и вероятно его вообще не нужно обрабатывать и/или нужно проверить на дубликат, возможно в том числе вручную. Всё зависит от конкретной реализации

> из-за того, что сообщения перекладываются асинхронно, возникают дубликаты А какая связь? Асинхронность не равно параллельность. В любом случае почему не взять shovel?

Dmitry- Автор вопроса
Vadim
> из-за того, что сообщения перекладываются асинхр...

Тут связь в следующем: мы асинхронно кладём сообщение в отдельный поток, и если оно на определённый таймаут не отправляется в другую очередь, мы выбрасываем исключение в главном потоке и через nack возвращаем сообщение в раббит, которое потом снова приходит в главный поток. Кажется, у нас асинхронность нужна только для таймаута по которому возвращать сообщение в раббит. За shovel спасибо - почитаю о нём

Dmitry
Тут связь в следующем: мы асинхронно кладём сообще...

Для таймаута лучше использовать паттерн очереди повторных попыток в вашем случае кмк

Dmitry- Автор вопроса
Aleksey Barabanov
Для таймаута лучше использовать паттерн очереди по...

Спасибо, а где можно почитать про этот паттерн? Это аналог BCQ очереди в IBM MQ?

Dmitry
Спасибо, а где можно почитать про этот паттерн? Эт...

В закрепе ссылка на курс, там в типовом использовании вроде я описывал как настроить такую очередь

Dmitry
Спасибо, а где можно почитать про этот паттерн? Эт...

Да, оно, суть таже, только рэббит не умеет считать количество попыток (или поправили уже? Давно не следил)

Aleksey Barabanov
В закрепе ссылка на курс, там в типовом использова...

https://gitlab.com/kilexst/rabbitmq-docs/-/tree/main/3%20-%20%D0%A2%D0%B8%D0%BF%D0%BE%D0%B2%D0%BE%D0%B5%20%D0%B8%D1%81%D0%BF%D0%BE%D0%BB%D1%8C%D0%B7%D0%BE%D0%B2%D0%B0%D0%BD%D0%B8%D0%B5#%D0%BE%D1%87%D0%B5%D1%80%D0%B5%D0%B4%D1%8C-%D0%BF%D0%BE%D0%B2%D1%82%D0%BE%D1%80%D0%BD%D1%8B%D1%85-%D0%BF%D0%BE%D0%BF%D1%8B%D1%82%D0%BE%D0%BA

Похожие вопросы

Обсуждают сегодня

Добрый день! Удалил все файлы с переменными из проекта, получил Error: release deploy: process resources: error validating adoptable resources: adoption validation failed: re...
Evgheni Mad
2
@aigrychev, @ilya_lesikov добрый день! а поддерживает ли werf helm xxxx или werf bundle xxxx работу с сабчартами через http-прокси? (сработает ли использование HTTP_PROXY/HTTP...
Сергей Голод
4
Привет! Вопрос про werf helm Приложение деплоится через werf helm upgrade --atomic Иногда(все условия для воспроизведения до конца непонятны, но есть версия, что это происходи...
𝓐𝓵͢͢͢𝓮𝔁 C
2
А как подмаунтить каталог если я не буду стапель юзать, а просто Докерфайл?
yoshi kakbudto
3
Всем привет. Werf v2.10.5 При удалении релиза вместе с неймспейсом (werf dismiss --namespace namespace_name) Сыпятся ошибки ┌ Waiting for resources elimination: namespaces/rel...
Vitalik Petrov
1
Всем привет. Сегодня добавили в приложение дополнительный образ nginx, в который докидывается системная статика прям в образ. При деплое бандлами деплоилось 200+ джоб(клиентов...
Владимир Муковоз
6
Добрый день, подскажите пожалуйста, а как поле project (в werf.yaml) параметризовать ? werf converge —project <APP_NAME> в одном общем репо держим 1 хельм чарт и деплоим с ...
Sulaymon
5
Вопросик не совсем werf. Но вдруг мы подскажите воркэраунд или ещё что-нибудь. Могу ли я как-нибудь в моменте деплоя внутри heml рендера получить хэшсумму файла шаблона (./tem...
Alex Подрябинкин
11
Друзья, добрый день. Прошу подсказать с базовым вопросом по использованию CI переменных gitlab в werf.yaml. Хочу в beforeInstall использовать env переменную с токеном. Мне нуж...
Anton Zol
10
Добрый день, после перехода с версии 1.2 на 2.10 werf cleanup начал удалять использующиеся теги, и до и после обновления использовались дефолтные политики keepPolicies Подскаж...
Дмитрий
29
Карта сайта