autoCommit: false и в случае успешной обработки сообщения делаю consumer.commitOffsets. и все работает.
Непонятно что делать при неуспешной обработке, так чтобы через какое-то время консумер снова прочитал сообщение и попробовал обработать.
Пробовал:
1) Просто ждать. (вдруг оно из коробки так делает)
2) Делать commitOffsets с текущим офсетом сообщения (без +1)
3) Вызывать у консумера pause/resume.
Ничего не помогает, консумер просто висит..
Повторно получает сообщение только при рестарте консумера, но рестартить консумер и ребалансить кафку каждый раз чот прям неочень хочтеся...
Может подскажете куда копать?
Как вариант, делать ретраи(Главное таймаут не словить), но вопрос сложный, серебрянной пули тут нет. Можно сразу кидать в dlq, можно действительно делать что-то вроде ручного backpressure через pause/resume.
Так он у вас его прочитал уже. Оно у вас уже есть. Зачем его по сети гонять ещё раз? Держите у себя и пробуйте обработать снова. Заново он его читать будет только действительно при реконнекте.
логично, можно и так... Просто думал отдать ретраи на сторону кафки, а не реализовывать логику ретраев внутри консумера. но наверное так будет правильнее.
Есть механизм ретраев через кафку, когда вы создаете отдельные топики для ретраев.
это я знаю, это на любых очередях можно реализовать...
Обсуждают сегодня