/*
 * Decompiled with CFR 0.152.
 */
package io.confluent.kafkarest.controllers;

import io.confluent.kafkarest.common.KafkaFutures;
import io.confluent.kafkarest.controllers.LastProducedTimeManager;
import io.confluent.kafkarest.controllers.TopicManager;
import io.confluent.kafkarest.entities.LastProducedTime;
import io.confluent.kafkarest.entities.Topic;
import io.confluent.kafkarest.exceptions.TopicPartitionNotFoundException;
import java.util.AbstractMap;
import java.util.Collections;
import java.util.Comparator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.stream.Collectors;
import javax.inject.Inject;
import org.apache.kafka.clients.admin.ConfluentAdmin;
import org.apache.kafka.clients.admin.ListOffsetsOptions;
import org.apache.kafka.clients.admin.ListOffsetsResult;
import org.apache.kafka.clients.admin.OffsetSpec;
import org.apache.kafka.common.KafkaFuture;
import org.apache.kafka.common.TopicPartition;
import org.jetbrains.annotations.NotNull;

final class LastProducedTimeManagerImpl
implements LastProducedTimeManager {
    private final TopicManager topicManager;
    private final ConfluentAdmin confluentAdminClient;

    @Inject
    LastProducedTimeManagerImpl(ConfluentAdmin confluentAdminClient, TopicManager topicManager) {
        this.topicManager = Objects.requireNonNull(topicManager);
        this.confluentAdminClient = Objects.requireNonNull(confluentAdminClient);
    }

    @Override
    public CompletableFuture<LastProducedTime> getLastProducedTime(String clusterId, String topicName) {
        return this.topicManager.getTopic(clusterId, topicName).thenCompose(topic -> this.getMaxTimestamps(clusterId, Collections.singletonList(topic.orElseThrow(() -> new TopicPartitionNotFoundException(clusterId, topicName, "Could not determine lastProducedTime for non-existent topic.")))).thenApply(listOffsetsResultInfos -> LastProducedTime.create(clusterId, topicName, listOffsetsResultInfos.values().stream().mapToLong(listOffsetsResultInfo -> listOffsetsResultInfo.timestamp()).max().orElse(-1L))));
    }

    @Override
    public CompletableFuture<List<LastProducedTime>> listLastProducedTimes(String clusterId) {
        return this.topicManager.listTopics(clusterId).thenCompose(topics -> this.getMaxTimestamps(clusterId, (List<Topic>)topics).thenApply(listOffsetsResultInfos -> listOffsetsResultInfos.entrySet().stream().collect(Collectors.groupingBy(e -> ((TopicPartition)e.getKey()).topic(), Collectors.maxBy(Comparator.comparingLong(e -> ((ListOffsetsResult.ListOffsetsResultInfo)e.getValue()).timestamp())))).values().stream().filter(e -> e.isPresent()).map(e -> LastProducedTime.create(clusterId, ((TopicPartition)((Map.Entry)e.get()).getKey()).topic(), ((ListOffsetsResult.ListOffsetsResultInfo)((Map.Entry)e.get()).getValue()).timestamp())).collect(Collectors.toList())));
    }

    @NotNull
    private CompletableFuture<Map<TopicPartition, ListOffsetsResult.ListOffsetsResultInfo>> getMaxTimestamps(String clusterId, List<Topic> topics) {
        return KafkaFutures.toCompletableFuture((KafkaFuture)this.confluentAdminClient.listOffsets(topics.stream().flatMap(topic -> topic.getPartitions().stream()).map(t -> new AbstractMap.SimpleImmutableEntry<TopicPartition, OffsetSpec>(t.toTopicPartition(), OffsetSpec.maxTimestamp())).collect(Collectors.toMap(e -> (TopicPartition)e.getKey(), e -> (OffsetSpec)e.getValue())), new ListOffsetsOptions()).all());
    }
}

