запускаю его через Connect через класс org.apache.kafka.connect.mirror.MirrorSourceConnector. Должен ли сервис реплицировать оффсеты групп и коммитить их в consumer_offsets из исходного кластера в целевой? Я вижу следующие конфиги в коннекторах и они как будто должны запускать таску из https://github.com/apache/kafka/pull/7577/files# и https://issues.apache.org/jira/browse/KAFKA-9076 . Но никаких оффсетов не коммитится и таска как будто не стартует. Что я делаю не так? :)
sync.group.offsets.enabled = true
sync.group.offsets.interval.seconds = 60
emit.checkpoints.enabled = true
emit.checkpoints.interval.seconds = 60
emit.heartbeats.enabled = true
emit.heartbeats.interval.seconds = 1
groups = [.*]
groups.blacklist = null
groups.exclude = [console-consumer-.*, connect-.*, .*]
topics = [.*]
topics.blacklist = null
topics.exclude = [.*[\-\.]internal, .*\.replica, .*]
у нас 2.6, 2.7 наверно у следующего заказчика накатим, конфиг у нас такой clusters=city1,city2 city1.bootstrap.servers=server1:9092, server2:9092, server3:9092 city2.bootstrap.servers=server1:9092, server2:9092, server3:9092 city1->city2.enabled=true city2.producer.max.request.size=33554432 city2.producer.batch.size=25165824 city1->city2.topics=city1-.* city2->city1.enabled=false city2->city1.topics=city2-.* replication.factor=3 checkpoints.topic.replication.factor=3 heartbeats.topic.replication.factor=3 offset-syncs.topic.replication.factor=3 offset.storage.replication.factor=3 status.storage.replication.factor=3 config.storage.replication.factor=3 replication.policy.separator=. sync.topic.acls.enabled=false emit.checkpoints.interval.seconds=10 emit.heartbeats.interval.seconds=10 refresh.topics.enabled=true refresh.groups.enabled=true refresh.topics.interval.seconds=60 refresh.groups.interval.seconds=60 max.request.size=52428800
И оффсеты по группам реплицируются из исходного в целевой кластер?
кстати, не проверял, у нас была задача агрегировать все данные с разных городов для предоставления данных интеграторам с одной точки
Скорее всего не реплицируются, исходя из https://cwiki.apache.org/confluence/display/KAFKA/KIP-545%3A+support+automated+consumer+offset+sync+across+clusters+in+MM+2.0 эта опция по дефолту выключена
Если кому-то интересно, чем закончилась история - таски не запускались, потому что их никто не запускал) Нужно было помимо этого коннектора MirrorSourceConnector.class запускать еще два, чтобы чекпоинты синкались)) MirrorHeartbeatConnector.class и MirrorCheckpointConnector.class
Как теперь выглядит конфиг в итоге?
Точно так же, как и был, просто теперь еще +2 коннектора добавил в кластер. Теперь все три мирроровских коннектора запущены и оффсеты засинкались. curl https://connect-cluster/connectors ["HeartbeatConnector","CheckpointConnector","SourceConnector"]
Обсуждают сегодня