Коллеги, использую aio-pika (python), для работы с rabbitmq использую connect_robust, чтобы

происходил реконнект
сообщения обрабатываю через метод IncomingMessage.process
но, когда соединение рвется, то обрабатываемое сообщение теряется, как я понимаю из-за закрытия канала в коннекте
робуст при реконнекте, почему то создает новый канал, а не поднимает старый
В доке информации очень скудно, в гугл правильный запрос подобрать немогу((
Думал поискать в коде, но там не нахожу каких-то методов для гарантированной отпраки

Подскажите, пожалуйста, как мне изменить обработку таким образом, чтобы сообщение точно отправилось, после восстановления соединения?
Простите, если прям не профильно(

2 ответов

82 просмотра

Что значит теряется? В процессе выполнения хендлера не обрабатывается? Не возвращается в очередь? Что значит точно отправилось? Отправка это публиш, а обработка это консьюм.

Patsy-Charmer Автор вопроса
Aleksey Barabanov
Что значит теряется? В процессе выполнения хендлер...

Я получил сообщение, начинаю его обработку. я делаю так: async with message.process(ignore_processed=True, reject_on_redelivered=True): result = await some_logic(message.body) await exchange.publish(result, ..) соединение рвется во время выполнения some_logic и до отправки еще не дошли по идее с флагом ignore_processed исключение не должно влиять на обработку сообщения, но обработка прерывается с логом Reject is not sent since channel is closed По идее, как я понял, я могу пометить сообщение аск, сколько угодно готовить ответ и отправить его (паблиш), в тот канал, что будет доступен на момент отправки в нужный обменник.

Похожие вопросы

Обсуждают сегодня

Всем привет. Понимаю, что, наверное, сто раз поднимали эту тему, но по ключевым словам не смог найти. Как передать в values.yaml зависимого хелм-чарта теги образов, собираемых...
Vitalik Petrov
4
Всем привет. Werf v2.10.5 При удалении релиза вместе с неймспейсом (werf dismiss --namespace namespace_name) Сыпятся ошибки ┌ Waiting for resources elimination: namespaces/rel...
Vitalik Petrov
1
@aigrychev, @ilya_lesikov добрый день! а поддерживает ли werf helm xxxx или werf bundle xxxx работу с сабчартами через http-прокси? (сработает ли использование HTTP_PROXY/HTTP...
Сергей Голод
4
Добрый день! Удалил все файлы с переменными из проекта, получил Error: release deploy: process resources: error validating adoptable resources: adoption validation failed: re...
Evgheni Mad
2
Привет! Вопрос про werf helm Приложение деплоится через werf helm upgrade --atomic Иногда(все условия для воспроизведения до конца непонятны, но есть версия, что это происходи...
𝓐𝓵͢͢͢𝓮𝔁 C
2
Всем привет. Сегодня добавили в приложение дополнительный образ nginx, в который докидывается системная статика прям в образ. При деплое бандлами деплоилось 200+ джоб(клиентов...
Владимир Муковоз
6
Добрый день, после перехода с версии 1.2 на 2.10 werf cleanup начал удалять использующиеся теги, и до и после обновления использовались дефолтные политики keepPolicies Подскаж...
Дмитрий
29
Блин а мне как поумнеть ?
Toxin
191
Друзья, добрый день. Прошу подсказать с базовым вопросом по использованию CI переменных gitlab в werf.yaml. Хочу в beforeInstall использовать env переменную с токеном. Мне нуж...
Anton Zol
10
Вопросик не совсем werf. Но вдруг мы подскажите воркэраунд или ещё что-нибудь. Могу ли я как-нибудь в моменте деплоя внутри heml рендера получить хэшсумму файла шаблона (./tem...
Alex Подрябинкин
11
Карта сайта