Братцы, а не работал ли кто с библиотекой fastkafka? Я создал

свой consumes-метод:

@kafka_app.consumes(topic="example_topic", auto_offset_reset="earliest", enable_auto_commit=True)
async def on_example_topic(msg: dict, meta: EventMetadata):
print(f"Got message: {msg}")
print(f"Got meta: {meta}")
print("---")

И заметил странное: что он всегда начинает прокручивать сообщения в очереди, начиная с offset=0, даже если при предыдущих запусках сервера эти сообщения уже обрабатывались.
Как бы это забороть?

20 ответов

42 просмотра

выкинуть говно на питоне и взять нормальную либу для спарка на scala, которая умеет в 100 раз больше. быстрее и лучше

А как groupId задаёте?

Felix-Neko Автор вопроса
Anatoliy
А как groupId задаёте?

Хех, задал я group_id в декораторе — и он таки перестал обрабатывать эти сообщения по второму кругу. Радостно!

Felix Neko
Хех, задал я group_id в декораторе — и он таки пер...

Стоит изучить получше как Kafka работает, потому что там много разных нюансов на которые можно напороться. Кафка требует глубокого понимания своего внутреннего устройства. Иначе имеет смысл смотреть в сторону более простых решений вроде AMQP

Felix-Neko Автор вопроса
Старый Хрыч
выкинуть говно на питоне и взять нормальную либу д...

Боюсь, вы слишком хорошего обо мне мнения = )

Felix Neko
Боюсь, вы слишком хорошего обо мне мнения = )

ну если вы собираетесь делать высоконагруженное приложение, то выборов не много

Так ведь опция auto_offset_reset="earliest" предписывает читать топик с начала

Felix-Neko Автор вопроса
Старый Хрыч
ну если вы собираетесь делать высоконагруженное п...

Ненене, больших нагрузок здесь не будет. Просто Кафка уже установлена, и ей можно пользоваться.

Felix-Neko Автор вопроса
Vadim Zaigrin
Так ведь опция auto_offset_reset="earliest" предпи...

Тут есть тонкость: когда я пользовался простым consumer'ом из kafka-python, то там при earliest-политике пропускались уже закоммиченные сообщения. И я думаю, как мне добиться такого же поведения и здесь. Причём так, чтобы коммитить сообщение только тогда, когда оно корректно обработано. А если некорректно, то дальше по партиции не ходить — и спамить в логи сообщениями об ошибках.

Felix Neko
Тут есть тонкость: когда я пользовался простым con...

Можно выключить автокоммит и коммитеть самому синхронно или асинхронно.

Felix-Neko Автор вопроса
Vadim Zaigrin
Попробуйте auto_offset_reset="latest"

По-моему, это имеет значение только тогда, когда коммитов не было.

Felix-Neko Автор вопроса

Это было бы очень здорово. Пользуясь kafka-python, я так и делал. А как бы такое сделать в fastkafka?

Felix Neko
Это было бы очень здорово. Пользуясь kafka-python,...

Если нужен клиент на Python, посмотрите на вариант от Confluent.

Видимо простой консумер указывал какую-то группу, а ваш новый нет. Почитайте про группы консумеров в кафке. Видимо между запусками чистится группа или же консумер каждый раз берет новую группу себе.

Felix-Neko Автор вопроса
Anatoliy
Видимо простой консумер указывал какую-то группу, ...

Кстати, а что бы лучше почитать по теории работы Кафки?

Felix Neko
Кстати, а что бы лучше почитать по теории работы К...

https://www.piter.com/collection/all/product/apache-kafka-potokovaya-obrabotka-i-analiz-dannyh-2-e-izdanie

Похожие вопросы

Обсуждают сегодня

Добрый вечер, Пока не совсем понимаю как наладить общение между телеграм ботом и ПО для работы с сим боксом. По самому боту так понял: - Нужен некий баланс, который можно поп...
Magic
6
сделал сайт, прикрутил в боте сайт, и виджет логина. как автоматически логинить пользователя в аккаунт(телеграм), при входе с бота?
Александра Чернивецкая
5
Объясните, пожалуйста, почему компилятор ругается на использование в условии неинициализированной переменной: int x; Task.Run(async () => { x = await somefunc(); }).Wait...
Александр
5
Ребят, подскажите, пожалуйста, почему в префиксе к ассетам, которые генерируются через фильтр | theme в шаблоне, стал вдруг появляться index.php? Вот так выглядит ссылка на а...
Виталий
1
Всем привет. Ребята, подскажите, пожалуйста. у ботов есть ограничение на отправку сообщений - 30 сообщений в секунду, эти ограничения накладываются на все сообщения? или на со...
Artem Stormageddon
4
Блин, ребята, сори за тупые вопросы. А можно ли как-то открыть вебапку по нажатию на кнопку в меню(которое появляется слева, команды)?
Artem Stormageddon
3
а плаксы из-под питона умеют только в комфортных условиях что-то выдавить из себя?)
Lencore
9
Но, может, есть уже проверенная? Наши требования такие: 1. Сообщения должны приходить из Инста в CRM оду 2. Должна быть возможность подключить несколько экаунтов Инстаграм. Р...
Alexander Sharoiko MSE / Александр Шаройко
13
Это может быть все-таки не флудвейт? у меня ботфазер принимает изменения и отображает даже что они изменились, на видео видно что он прислал якобы уже измененное описание, н...
OVERLINK
13
Коллеги, может знает кто, можно ли цвет бейджа счётчика в BackendMenu менять без бубнов?
Alex Blaze
3
Карта сайта