свой consumes-метод:
@kafka_app.consumes(topic="example_topic", auto_offset_reset="earliest", enable_auto_commit=True)
async def on_example_topic(msg: dict, meta: EventMetadata):
print(f"Got message: {msg}")
print(f"Got meta: {meta}")
print("---")
И заметил странное: что он всегда начинает прокручивать сообщения в очереди, начиная с offset=0, даже если при предыдущих запусках сервера эти сообщения уже обрабатывались.
Как бы это забороть?
выкинуть говно на питоне и взять нормальную либу для спарка на scala, которая умеет в 100 раз больше. быстрее и лучше
А как groupId задаёте?
Хех, задал я group_id в декораторе — и он таки перестал обрабатывать эти сообщения по второму кругу. Радостно!
Стоит изучить получше как Kafka работает, потому что там много разных нюансов на которые можно напороться. Кафка требует глубокого понимания своего внутреннего устройства. Иначе имеет смысл смотреть в сторону более простых решений вроде AMQP
Боюсь, вы слишком хорошего обо мне мнения = )
ну если вы собираетесь делать высоконагруженное приложение, то выборов не много
Так ведь опция auto_offset_reset="earliest" предписывает читать топик с начала
Ненене, больших нагрузок здесь не будет. Просто Кафка уже установлена, и ей можно пользоваться.
Тут есть тонкость: когда я пользовался простым consumer'ом из kafka-python, то там при earliest-политике пропускались уже закоммиченные сообщения. И я думаю, как мне добиться такого же поведения и здесь. Причём так, чтобы коммитить сообщение только тогда, когда оно корректно обработано. А если некорректно, то дальше по партиции не ходить — и спамить в логи сообщениями об ошибках.
Попробуйте auto_offset_reset="latest"
Можно выключить автокоммит и коммитеть самому синхронно или асинхронно.
По-моему, это имеет значение только тогда, когда коммитов не было.
Это было бы очень здорово. Пользуясь kafka-python, я так и делал. А как бы такое сделать в fastkafka?
Если нужен клиент на Python, посмотрите на вариант от Confluent.
https://docs.confluent.io/kafka-clients/python/current/overview.html
Видимо простой консумер указывал какую-то группу, а ваш новый нет. Почитайте про группы консумеров в кафке. Видимо между запусками чистится группа или же консумер каждый раз берет новую группу себе.
Кстати, а что бы лучше почитать по теории работы Кафки?
Confluent, официальную доку или Strimzi
https://www.piter.com/collection/all/product/apache-kafka-potokovaya-obrabotka-i-analiz-dannyh-2-e-izdanie
https://www.gentlydownthe.stream/
Обсуждают сегодня