лежит?
public Optional<Map<TopicPartition, Long>> getOffsets(String topicName) { try (KafkaConsumer<String, String> consumer = createConsumer()) { List<TopicPartition> allPartitions = consumer.partitionsFor(topicName).stream() .map(info -> new TopicPartition(info.topic(), info.partition())) .collect(toList()); consumer.assign(allPartitions); Map<TopicPartition, Long> map = consumer.endOffsets(allPartitions); return Optional.of(map); } }
Обсуждают сегодня