консьюмер группы? через kafka-consumer-groups.sh у меня не получилось справится с аутентификацией между клиентом и сервером.
пробую на питоне скриптом
topic = 'my-topic'
consumer.subscribe([topic])
filtered_topics = consumer.list_topics()
partitions = filtered_topics.topics[topic].partitions
topic_partitions = list()
for part in partitions:
topic_partitions.append(TopicPartition(topic=topic, partition=part, offset=OFFSET_BEGINNING))
consumer.assign(topic_partitions)
for tp in topic_partitions:
consumer.seek(tp)
try:
msg = None
while msg is None:
msg = consumer.poll(timeout=1.0)
consumer.commit(offsets=topic_partitions, asynchronous=False)
finally:
consumer.close()
коммит выдаёт что cimpl.KafkaException: KafkaError{code=_NO_OFFSET,val=-168,str="Commit failed: Local: No offset stored"}
хотя я в akhq ui для кафки вижу что лаг появился, хотя не до конца, не тот который я ожидаю
через kafka admin api лучше, не через консюмер апи
не нашёл примера через admin api, всё стараются через on_assign сделать, например тут https://developer.confluent.io/get-started/python/?_ga=2.211577683.772978707.1678800149-871619111.1672400887#build-consumer я вроде решил свою проблему с помощью https://github.com/confluentinc/confluent-kafka-python/issues/201#issuecomment-330773567
ой, я ошибся в го я бы подключился клиентом и напрямую запрос в апи кафки сделал без консюмера и вот этого всего
Обсуждают сегодня