/*
 * Decompiled with CFR 0.152.
 */
package io.confluent.controlcenter;

import com.google.common.base.Function;
import com.google.common.base.Preconditions;
import com.google.common.base.Predicate;
import com.google.common.collect.Collections2;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Iterables;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import com.google.inject.Inject;
import io.confluent.controlcenter.BootstrapClientSupplier;
import io.confluent.controlcenter.ControlCenterConfig;
import io.confluent.controlcenter.ControlCenterConfigModule;
import io.confluent.controlcenter.Rollup;
import io.confluent.controlcenter.util.TopicInfo;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import javax.annotation.Nullable;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.CreateTopicsResult;
import org.apache.kafka.clients.admin.DescribeClusterResult;
import org.apache.kafka.clients.admin.ListTopicsResult;
import org.apache.kafka.clients.admin.TopicDescription;
import org.apache.kafka.clients.admin.TopicListing;
import org.apache.kafka.clients.consumer.InvalidOffsetException;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.consumer.OffsetAndTimestamp;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.TopicPartitionInfo;
import org.apache.kafka.common.errors.InterruptException;
import org.apache.kafka.common.errors.TopicExistsException;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.streams.StreamsConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class KafkaHelper {
    private static final Logger log = LoggerFactory.getLogger(KafkaHelper.class);
    private static final long TIMEOUT = TimeUnit.SECONDS.toMillis(15L);
    private final AdminClient adminClient;
    private final long retrySleepMs;

    public KafkaHelper(AdminClient adminClient) {
        this(adminClient, TIMEOUT);
    }

    public KafkaHelper(AdminClient adminClient, long retrySleepMs) {
        this.adminClient = adminClient;
        this.retrySleepMs = retrySleepMs;
    }

    public boolean checkPartitions(TopicDescription topicDescription, int partitions) {
        return topicDescription.partitions().size() == partitions;
    }

    public boolean checkReplicas(TopicPartitionInfo topicPartitionInfo, int replicas) {
        return topicPartitionInfo.replicas().size() >= replicas;
    }

    public boolean checkMinIsr(TopicPartitionInfo topicPartitionInfo, int minIsr) {
        return topicPartitionInfo.isr().size() >= minIsr;
    }

    static long compactedTopicRetention(String topic, long retentionMs) {
        long topicRetentionMs = retentionMs;
        for (Rollup rollup : Rollup.values()) {
            if (!topic.contains(rollup.name())) continue;
            topicRetentionMs = rollup.getRetainMillis();
        }
        return topicRetentionMs += TimeUnit.DAYS.toMillis(1L);
    }

    private boolean topicIsValid(TopicInfo topicInfo, TopicDescription topicDescription) {
        boolean topicInError = false;
        log.info("checking topicDescription={}", (Object)topicDescription);
        if (topicInfo.validateConfig && !this.checkPartitions(topicDescription, topicInfo.partitions)) {
            log.error("found topic={} with partitions={} instead of expectedPartitions={}", new Object[]{topicDescription.name(), topicDescription.partitions().size(), topicInfo.partitions});
            topicInError = true;
        } else {
            log.info("found topic={} with partitions={}", (Object)topicDescription.name(), (Object)topicDescription.partitions().size());
        }
        for (TopicPartitionInfo topicPartitionInfo : topicDescription.partitions()) {
            if (topicInfo.validateConfig && !this.checkReplicas(topicPartitionInfo, topicInfo.replication)) {
                log.error("found topic={} with replication={} expecting at least expectedReplication={}", new Object[]{topicDescription.name(), topicPartitionInfo.replicas().size(), topicInfo.replication});
                topicInError = true;
            }
            int minIsr = Integer.parseInt(topicInfo.config.get("min.insync.replicas").value());
            if (!topicInfo.validateConfig || this.checkMinIsr(topicPartitionInfo, minIsr)) continue;
            log.error("found topic={} with isr={} below minIsr={}", new Object[]{topicDescription.name(), topicPartitionInfo.isr().size(), minIsr});
            topicInError = true;
        }
        return !topicInError;
    }

    private boolean topicIsValid(TopicInfo topicInfo) throws InterruptedException, ExecutionException {
        boolean success = false;
        int timeoutRetries = 3;
        while (!success && timeoutRetries-- > 0) {
            try {
                Iterator topicIterator = ((Map)this.adminClient.describeTopics(Collections.singleton(topicInfo.name)).all().get(TIMEOUT, TimeUnit.MILLISECONDS)).values().iterator();
                if (topicIterator.hasNext()) {
                    if (!this.topicIsValid(topicInfo, (TopicDescription)topicIterator.next())) break;
                    success = true;
                    continue;
                }
                log.error("Expecting topic in describe-topics call. topic={}", (Object)topicInfo);
            }
            catch (TimeoutException timeoutException) {}
        }
        return success;
    }

    private boolean checkCreateTopics(Set<TopicInfo> topics) throws InterruptedException, ExecutionException, TimeoutException {
        TopicInfo topicInfo;
        long timeToWait = TIMEOUT;
        final HashMap topicInfoMap = Maps.newHashMap();
        for (TopicInfo ti : topics) {
            topicInfoMap.put(ti.name, ti);
        }
        ListTopicsResult ltr = this.adminClient.listTopics();
        HashSet topicListings = Sets.newHashSet((Iterable)Iterables.filter((Iterable)((Iterable)ltr.listings().get(timeToWait, TimeUnit.MILLISECONDS)), (Predicate)new Predicate<TopicListing>(){

            public boolean apply(TopicListing input) {
                return topicInfoMap.keySet().contains(input.name());
            }
        }));
        log.info("topicListings={}", (Object)topicListings);
        HashSet missingTopics = Sets.newHashSet((Iterable)Sets.difference(topicInfoMap.keySet(), (Set)Sets.newHashSet((Iterable)Collections2.transform((Collection)topicListings, (Function)new Function<TopicListing, String>(){

            public String apply(TopicListing input) {
                return input.name();
            }
        }))));
        log.info("missingTopics={}", (Object)missingTopics);
        Sets.SetView extantTopics = Sets.intersection(topicInfoMap.keySet(), (Set)((Set)ltr.names().get()));
        log.info("extantTopics={}", (Object)extantTopics);
        Collection topicDescriptions = ((Map)this.adminClient.describeTopics((Collection)extantTopics).all().get(timeToWait, TimeUnit.MILLISECONDS)).values();
        boolean topicInError = false;
        for (TopicDescription topicDescription : topicDescriptions) {
            if (this.topicIsValid((TopicInfo)topicInfoMap.get(topicDescription.name()), topicDescription)) continue;
            topicInError = true;
        }
        if (topicInError) {
            return false;
        }
        for (String topic : missingTopics) {
            int retries = 3;
            boolean success = false;
            while (!success && retries-- > 0) {
                topicInfo = (TopicInfo)topicInfoMap.get(topic);
                try {
                    log.trace("creating topic={}", (Object)topicInfo);
                    CreateTopicsResult ctr = this.adminClient.createTopics(Collections.singleton(topicInfo.toNewTopic()));
                    ctr.all().get(timeToWait, TimeUnit.MILLISECONDS);
                    log.debug("create=attempt topic={}", (Object)topicInfo);
                    success = true;
                }
                catch (TimeoutException te) {
                    log.debug("TimeoutException while creating topic {}", (Object)topicInfo);
                    int timeoutRetries = 3;
                    while (!success && timeoutRetries-- > 0) {
                        try {
                            Collection createdDescriptions = ((Map)this.adminClient.describeTopics(Collections.singleton(topicInfo.name)).all().get(timeToWait, TimeUnit.MILLISECONDS)).values();
                            if (!createdDescriptions.isEmpty()) {
                                success = true;
                                continue;
                            }
                            Thread.sleep(this.retrySleepMs);
                        }
                        catch (TimeoutException timeoutException) {}
                    }
                    log.error("attempt=failed to create topic={}", (Object)topicInfo, (Object)te);
                }
                catch (ExecutionException ee) {
                    if (ee.getCause() instanceof TopicExistsException) {
                        if (!topicInfo.validateConfig) {
                            log.debug("Topic topic={} already created, using the existing one", (Object)topicInfo);
                            success = true;
                        } else {
                            log.error("Race condition trying to create topic={}", (Object)topicInfo, (Object)ee.getCause());
                            if (!this.topicIsValid(topicInfo)) break;
                            success = true;
                            break;
                        }
                    }
                    log.error("attempt=failed to create topic={}", (Object)topicInfo, (Object)ee.getCause());
                }
                catch (Throwable e) {
                    log.error("attempt=failed to create topic={}", (Object)topicInfo, (Object)e);
                }
            }
            if (success) continue;
            return false;
        }
        int retries = 3;
        while (missingTopics.size() > 0 && retries-- > 0) {
            try {
                log.info("describing topics={}", (Object)missingTopics);
                Collection createdDescriptions = ((Map)this.adminClient.describeTopics((Collection)ImmutableList.copyOf((Collection)missingTopics)).all().get(timeToWait, TimeUnit.MILLISECONDS)).values();
                for (TopicDescription tm : createdDescriptions) {
                    topicInfo = (TopicInfo)topicInfoMap.get(tm.name());
                    Preconditions.checkArgument((boolean)missingTopics.contains(tm.name()), (Object)("incorrect topic=" + tm.name() + " returned"));
                    Preconditions.checkArgument((!topicInfo.validateConfig || this.checkPartitions(tm, topicInfo.partitions) ? 1 : 0) != 0, (Object)("incorrect partitions topic=" + tm.name()));
                    log.info("create=success topic={}", (Object)topicInfo);
                    missingTopics.remove(tm.name());
                }
            }
            catch (Throwable e) {
                boolean bl = topicInError = retries < 1;
                if (topicInError) {
                    log.error("create=failed missingTopics={}", (Object)Iterables.transform((Iterable)missingTopics, (Function)new Function<String, Object>(){

                        @Nullable
                        public Object apply(@Nullable String input) {
                            return topicInfoMap.get(input);
                        }
                    }), (Object)e);
                    continue;
                }
                try {
                    Thread.sleep(this.retrySleepMs);
                }
                catch (InterruptedException interruptedException) {}
            }
        }
        return !topicInError;
    }

    public void deleteTopic(String topic) throws Throwable {
        try {
            this.adminClient.deleteTopics(Collections.singleton(topic)).all().get();
        }
        catch (InterruptedException e) {
            throw new InterruptException(e);
        }
        catch (ExecutionException e) {
            throw e.getCause();
        }
    }

    public static <K, V> Map<TopicPartition, OffsetAndMetadata> offsetsForTimestamp(KafkaConsumer<K, V> consumer, String topic, long timestamp) {
        log.debug("check and maybe seek to offsets for topic={}", (Object)topic);
        HashMap<TopicPartition, OffsetAndMetadata> newOffsets = new HashMap<TopicPartition, OffsetAndMetadata>();
        Set<TopicPartition> topicPartitions = KafkaHelper.partitionsForTopic(consumer, topic);
        if (topicPartitions.isEmpty()) {
            log.warn("unable to find any topicPartitions for topic={}", (Object)topic);
            return newOffsets;
        }
        consumer.assign(topicPartitions);
        log.info("found {} topicPartitions for topic={}", (Object)topicPartitions.size(), (Object)topic);
        HashMap<TopicPartition, Long> timestampsToSearch = new HashMap<TopicPartition, Long>();
        for (TopicPartition tp : topicPartitions) {
            timestampsToSearch.put(tp, timestamp);
        }
        Map offsetsAndTimestamps = consumer.offsetsForTimes(timestampsToSearch);
        HashSet<TopicPartition> seekToEndTopicPartitions = new HashSet<TopicPartition>();
        for (TopicPartition tp : topicPartitions) {
            OffsetAndTimestamp newOffsetTimestamp = (OffsetAndTimestamp)offsetsAndTimestamps.get(tp);
            if (newOffsetTimestamp != null) {
                OffsetAndMetadata committedOffsetsAndMeta = consumer.committed(tp);
                long comOffset = committedOffsetsAndMeta == null ? 0L : committedOffsetsAndMeta.offset();
                long newOffset = newOffsetTimestamp.offset();
                log.debug("newOffset={} committedOffset={} @ ts={} for topicPartition={}", new Object[]{newOffset, comOffset, timestamp, tp.toString()});
                if (comOffset >= newOffset) continue;
                log.debug("will seek to new offset for {} @ ts={}, committed={} new={}", new Object[]{tp.toString(), timestamp, comOffset, newOffset});
                newOffsets.put(tp, new OffsetAndMetadata(newOffset));
                continue;
            }
            log.debug("unable to get offset for topicPartition={} @ ts={}", (Object)tp.toString(), (Object)timestamp);
            seekToEndTopicPartitions.add(tp);
        }
        if (!seekToEndTopicPartitions.isEmpty()) {
            consumer.seekToEnd(seekToEndTopicPartitions);
            for (TopicPartition tp : seekToEndTopicPartitions) {
                try {
                    long endOffset = consumer.position(tp);
                    log.debug("will seek to latest offset for {} @ ts={}, new={} ", new Object[]{tp, timestamp, endOffset});
                    newOffsets.put(tp, new OffsetAndMetadata(endOffset));
                }
                catch (InvalidOffsetException e) {
                    log.warn("Unable to determine latest offset for topicPartition={}", (Object)tp, (Object)e);
                }
            }
        }
        return newOffsets;
    }

    public static <K, V> Set<TopicPartition> partitionsForTopic(KafkaConsumer<K, V> consumer, String topic) {
        List partitions = null;
        HashSet<TopicPartition> topicPartitions = new HashSet<TopicPartition>();
        int retries = 3;
        while ((partitions == null || partitions.isEmpty()) && retries-- > 0) {
            partitions = consumer.partitionsFor(topic);
            try {
                Thread.sleep(50L);
            }
            catch (InterruptedException interruptedException) {}
        }
        if (partitions != null) {
            for (PartitionInfo partitionInfo : partitions) {
                topicPartitions.add(new TopicPartition(topic, partitionInfo.partition()));
            }
        }
        return topicPartitions;
    }

    public static class ControlCenterPreconditions
    implements Callable<Boolean> {
        private final ControlCenterConfig config;
        private final StreamsConfig streamsConfig;
        private final BootstrapClientSupplier bootstrapClient;
        private final Set<TopicInfo> topics;

        @Inject
        public ControlCenterPreconditions(ControlCenterConfig config, StreamsConfig streamsConfig, BootstrapClientSupplier bootstrapClient, @ControlCenterConfigModule.ControlTopics Set<TopicInfo> topics) {
            this.config = config;
            this.streamsConfig = streamsConfig;
            this.bootstrapClient = bootstrapClient;
            this.topics = topics;
        }

        int availableBrokers(DescribeClusterResult result) throws InterruptedException, ExecutionException, TimeoutException {
            Collection brokers = (Collection)result.nodes().get(TIMEOUT, TimeUnit.MILLISECONDS);
            if (brokers == null) {
                return 0;
            }
            return brokers.size();
        }

        boolean isBrokerCompatible(DescribeClusterResult result) throws InterruptedException, ExecutionException, TimeoutException {
            String clusterId = (String)result.clusterId().get(TIMEOUT, TimeUnit.MILLISECONDS);
            if (clusterId == null) {
                log.error("broker version needs to be 0.10.1.x or higher");
                return false;
            }
            return true;
        }

        void seekToStartOffsetsForInputTopics() {
            Map consumerConfig = this.streamsConfig.getRestoreConsumerConfigs("will-delete-this");
            consumerConfig.putAll(ImmutableMap.of((Object)"group.id", (Object)this.streamsConfig.getString("application.id"), (Object)"enable.auto.commit", (Object)"false", (Object)"auto.offset.reset", (Object)"latest"));
            try (KafkaConsumer consumer = new KafkaConsumer(consumerConfig, (Deserializer)new ByteArrayDeserializer(), (Deserializer)new ByteArrayDeserializer());){
                long now = System.currentTimeMillis();
                this.seekOffsets(consumer, now, TimeUnit.MINUTES.toMillis(this.config.getLong("confluent.monitoring.interceptor.topic.skip.backlog.minutes")), this.config.getLong("confluent.monitoring.interceptor.topic.retention.ms"), this.config.getString("confluent.monitoring.interceptor.topic"));
                this.seekOffsets(consumer, now, TimeUnit.MINUTES.toMillis(this.config.getLong("confluent.metrics.topic.skip.backlog.minutes")), this.config.getLong("confluent.metrics.topic.retention.ms"), this.config.getString("confluent.metrics.topic"));
            }
            catch (Exception e) {
                log.warn("failed seeking to new offsets", (Throwable)e);
            }
        }

        private <K, V> void seekOffsets(KafkaConsumer<K, V> consumer, long now, long skipBacklogMs, long topicRetention, String topic) {
            if (skipBacklogMs < topicRetention) {
                log.info("Setting offsets for topic={}", (Object)topic);
                consumer.commitSync(KafkaHelper.offsetsForTimestamp(consumer, topic, now - skipBacklogMs));
            }
        }

        /*
         * Enabled aggressive block sorting
         * Enabled unnecessary exception pruning
         * Enabled aggressive exception aggregation
         */
        @Override
        public Boolean call() throws Exception {
            boolean createdTopics;
            int internalReplication = this.config.getInt("confluent.controlcenter.internal.topics.replication");
            int retries = 4;
            int tries = 0;
            boolean error = true;
            while (tries < 4 && error) {
                ++tries;
                try {
                    AdminClient primaryClient = this.bootstrapClient.get();
                    Throwable throwable = null;
                    try {
                        DescribeClusterResult clusterResult = primaryClient.describeCluster();
                        if (!this.isBrokerCompatible(clusterResult)) {
                            Boolean bl = false;
                            return bl;
                        }
                        Integer brokersAvailable = this.availableBrokers(clusterResult);
                        if (brokersAvailable < internalReplication) {
                            log.error("{} brokers are required but only found {}. Check the topic replication settings in the properties file or add more brokers to your cluster", (Object)internalReplication, (Object)brokersAvailable);
                            Thread.sleep(TimeUnit.SECONDS.toMillis(2L));
                            continue;
                        }
                        error = false;
                    }
                    catch (Throwable throwable2) {
                        throwable = throwable2;
                        throw throwable2;
                    }
                    finally {
                        if (primaryClient == null) continue;
                        if (throwable != null) {
                            try {
                                primaryClient.close();
                            }
                            catch (Throwable throwable3) {
                                throwable.addSuppressed(throwable3);
                            }
                            continue;
                        }
                        primaryClient.close();
                    }
                }
                catch (InterruptedException e) {
                    log.error("Startup interrupted", (Throwable)e);
                    Thread.currentThread().interrupt();
                    return false;
                }
                catch (ExecutionException e) {
                    log.error("Error occurred on start up.", e.getCause());
                }
                catch (TimeoutException e) {
                    log.error("Broker check failed. Ensure that Kafka is running on any of the hosts: {}", (Object)this.config.getList("bootstrap.servers"), (Object)e);
                }
            }
            if (error) {
                return false;
            }
            try (AdminClient adminClient = this.bootstrapClient.get();){
                KafkaHelper kafkaHelper = new KafkaHelper(adminClient);
                createdTopics = kafkaHelper.checkCreateTopics(this.topics);
            }
            if (createdTopics) {
                this.seekToStartOffsetsForInputTopics();
                return createdTopics;
            }
            log.error("Startup failed to create necessary topics");
            return createdTopics;
        }
    }
}

