Братцы, а не работал ли кто с библиотекой fastkafka? Я создал

свой consumes-метод:

@kafka_app.consumes(topic="example_topic", auto_offset_reset="earliest", enable_auto_commit=True)
async def on_example_topic(msg: dict, meta: EventMetadata):
print(f"Got message: {msg}")
print(f"Got meta: {meta}")
print("---")

И заметил странное: что он всегда начинает прокручивать сообщения в очереди, начиная с offset=0, даже если при предыдущих запусках сервера эти сообщения уже обрабатывались.
Как бы это забороть?

20 ответов

12 просмотров

выкинуть говно на питоне и взять нормальную либу для спарка на scala, которая умеет в 100 раз больше. быстрее и лучше

А как groupId задаёте?

Felix-Neko Автор вопроса
Anatoliy
А как groupId задаёте?

Хех, задал я group_id в декораторе — и он таки перестал обрабатывать эти сообщения по второму кругу. Радостно!

Felix Neko
Хех, задал я group_id в декораторе — и он таки пер...

Стоит изучить получше как Kafka работает, потому что там много разных нюансов на которые можно напороться. Кафка требует глубокого понимания своего внутреннего устройства. Иначе имеет смысл смотреть в сторону более простых решений вроде AMQP

Felix-Neko Автор вопроса
Старый Хрыч
выкинуть говно на питоне и взять нормальную либу д...

Боюсь, вы слишком хорошего обо мне мнения = )

Felix Neko
Боюсь, вы слишком хорошего обо мне мнения = )

ну если вы собираетесь делать высоконагруженное приложение, то выборов не много

Так ведь опция auto_offset_reset="earliest" предписывает читать топик с начала

Felix-Neko Автор вопроса
Старый Хрыч
ну если вы собираетесь делать высоконагруженное п...

Ненене, больших нагрузок здесь не будет. Просто Кафка уже установлена, и ей можно пользоваться.

Felix-Neko Автор вопроса
Vadim Zaigrin
Так ведь опция auto_offset_reset="earliest" предпи...

Тут есть тонкость: когда я пользовался простым consumer'ом из kafka-python, то там при earliest-политике пропускались уже закоммиченные сообщения. И я думаю, как мне добиться такого же поведения и здесь. Причём так, чтобы коммитить сообщение только тогда, когда оно корректно обработано. А если некорректно, то дальше по партиции не ходить — и спамить в логи сообщениями об ошибках.

Felix Neko
Тут есть тонкость: когда я пользовался простым con...

Можно выключить автокоммит и коммитеть самому синхронно или асинхронно.

Felix-Neko Автор вопроса
Vadim Zaigrin
Попробуйте auto_offset_reset="latest"

По-моему, это имеет значение только тогда, когда коммитов не было.

Felix-Neko Автор вопроса

Это было бы очень здорово. Пользуясь kafka-python, я так и делал. А как бы такое сделать в fastkafka?

Felix Neko
Это было бы очень здорово. Пользуясь kafka-python,...

Если нужен клиент на Python, посмотрите на вариант от Confluent.

Видимо простой консумер указывал какую-то группу, а ваш новый нет. Почитайте про группы консумеров в кафке. Видимо между запусками чистится группа или же консумер каждый раз берет новую группу себе.

Felix-Neko Автор вопроса
Anatoliy
Видимо простой консумер указывал какую-то группу, ...

Кстати, а что бы лучше почитать по теории работы Кафки?

Felix Neko
Кстати, а что бы лучше почитать по теории работы К...

https://www.piter.com/collection/all/product/apache-kafka-potokovaya-obrabotka-i-analiz-dannyh-2-e-izdanie

Похожие вопросы

Обсуждают сегодня

Всем привет) Я попробовал турбо роутер октябрьский. Вроде доволен, но возникла проблемка) Бутстраповские модалки плодят .modal-backdrop элементы Если модалка открыта, должне...
Виталий
3
Так а кто может спарсить всех участников чата? Идишники
Magic
18
да пофиг на капчу зашел в чат и молчишь при этом ты нонейм? пошел вон
Magic
17
Как удалить health check в Consul? Казалось бы, это должно быть не сложно, но я не могу найти в документации ничего про это, только про добавление service с health check "в н...
Roman
2
Привет, кто может сделать юзербота с апи? Задачи: - создавать группы - создавать каналы - задавать для созданных каналов аватарку или эмоджи, имя группы - добавлять в группы...
Lencore
13
Хотя вроде админка показывает удаленные модели, да? @dblackCat
Виталий
2
Privet! Mozhet jesti ideji - nemogu sdelatj upload backup s filestore cerez WEB. Fail okolo 450mb, eto mozhet bitj prichinoi? Nemogu ponjatj..kak zagruzitj backup... Poluchaju...
Matiss 🤘 Black Oak IT 🌳 Batumi 🌴 Latvija
5
Нужно магазин с тильды на опен кат перенести Есть кто умеет? В лс
Magic
8
Всем доброго вечера! Хочу поделиться своим злоключением с человеком, который, как оказалось сюда тоже скидывал свое резюме. Жаль, что я вашу группу не нашел раньше… человек ки...
Роман Ахмедзянов
4
А кто знает в тейлоре до сих пор есть конфликты слагов или поправили уже?
Black Cat
5
Карта сайта