вручную если
config.Consumer.Offsets.AutoCommit.Enable = false
? Чем отличаются
session.MarkMessage
и
session.Commit
? Почему вызов Commit ни на что не влияет?
config.Consumer.Offsets.AutoCommit.Enable настройка отвечает за автокоммит помарченных оффсетов в фоне соответственно обычно эту настройку оставляют включенной, а обработанные оффсеты помечают через session.MarkMessage, чтобы потом в фоне они могли быть закомиченны
а Commit - сохраняет в брокер помеченные оффсеты, если вы ничего не пометили, то и сохранять нечего https://github.com/Shopify/sarama/blob/master/consumer_group.go#L519
Почему то если автокоммит выключен достаточно только помарчить оффсеты и при следующем запуске эти сообщения уже не придут
ну если вы сами Commit вызвали после того, как помарчили сообщения, то оно так и должно работать если не вызывали, то можно посмотреть в исходники, в Close вполне может быть явный вызов Commit а вообще какая у вас задача? зачем вам нужно имеено руками комитить, а не двигать оффсет?
в том то и дело что Commit не вызываю. Задача валить сервис при ошибке, чтобы при следующей итерации сервиса попробовать еще раз обработать сообщение, сейчас у нас логика реализована на либе confluent-kafka-go, но я присматриваюсь в сторону sarama, хочу переписать логику на эту либу, но смутил этот критичный момент с автокоммитом, притом что логика работы не понятна не только мне: https://github.com/Shopify/sarama/issues/1570
а в чем проблема не помечать оффсет, пока сообщение не будет обработано?
Да, там реально в close вызывается flushToBroker, спасибо за наводку
Обсуждают сегодня