/*
 * Decompiled with CFR 0.152.
 */
package com.linkedin.kafka.cruisecontrol.monitor;

import com.linkedin.cruisecontrol.common.utils.Utils;
import com.linkedin.cruisecontrol.monitor.sampling.aggregator.AggregatedMetricValues;
import com.linkedin.cruisecontrol.monitor.sampling.aggregator.Extrapolation;
import com.linkedin.cruisecontrol.monitor.sampling.aggregator.MetricValues;
import com.linkedin.cruisecontrol.monitor.sampling.aggregator.ValuesAndExtrapolations;
import com.linkedin.kafka.cruisecontrol.common.MetadataClient;
import com.linkedin.kafka.cruisecontrol.common.Resource;
import com.linkedin.kafka.cruisecontrol.config.BrokerCapacityConfigResolver;
import com.linkedin.kafka.cruisecontrol.config.BrokerCapacityInfo;
import com.linkedin.kafka.cruisecontrol.config.KafkaCruiseControlConfig;
import com.linkedin.kafka.cruisecontrol.model.Broker;
import com.linkedin.kafka.cruisecontrol.model.ClusterModel;
import com.linkedin.kafka.cruisecontrol.model.Disk;
import com.linkedin.kafka.cruisecontrol.monitor.metricdefinition.KafkaMetricDef;
import com.linkedin.kafka.cruisecontrol.monitor.sampling.aggregator.SampleExtrapolation;
import com.linkedin.kafka.cruisecontrol.monitor.sampling.holder.PartitionEntity;
import com.linkedin.kafka.cruisecontrol.monitor.sampling.holder.ReplicaEntity;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.stream.Collectors;
import org.apache.kafka.clients.admin.ConfluentAdmin;
import org.apache.kafka.clients.admin.DescribeLogDirsOptions;
import org.apache.kafka.clients.admin.ReplicaInfo;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.KafkaFuture;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class MonitorUtils {
    public static final double UNIT_INTERVAL_TO_PERCENTAGE = 100.0;
    private static final Logger LOG = LoggerFactory.getLogger(MonitorUtils.class);
    private static final Set<Short> FOLLOWER_METRIC_DEF_IDS = new HashSet<Short>(Arrays.asList(KafkaMetricDef.commonMetricDefId(KafkaMetricDef.REPLICATION_BYTES_IN_RATE), KafkaMetricDef.commonMetricDefId(KafkaMetricDef.FETCH_FROM_FOLLOWER_BYTES_OUT_RATE), KafkaMetricDef.commonMetricDefId(KafkaMetricDef.FOLLOWER_FETCH_REQUEST_RATE), KafkaMetricDef.commonMetricDefId(KafkaMetricDef.FETCH_FROM_FOLLOWER_REQUEST_RATE)));

    private MonitorUtils() {
    }

    private static AggregatedMetricValues toFollowerMetricValues(AggregatedMetricValues aggregatedMetricValues) {
        AggregatedMetricValues followerLoad = new AggregatedMetricValues();
        for (short metricId : aggregatedMetricValues.metricIds()) {
            Resource metricResource = KafkaMetricDef.commonMetricDef().metricInfo(metricId).resource();
            if (Resource.CPU == metricResource || Resource.DISK == metricResource || FOLLOWER_METRIC_DEF_IDS.contains(metricId)) {
                followerLoad.add(metricId, aggregatedMetricValues.valuesFor(metricId));
                continue;
            }
            followerLoad.add(metricId, new MetricValues(aggregatedMetricValues.length()));
        }
        return followerLoad;
    }

    private static AggregatedMetricValues toLeaderMetricValues(AggregatedMetricValues aggregatedMetricValues) {
        AggregatedMetricValues leaderLoad = new AggregatedMetricValues();
        for (short metricId : aggregatedMetricValues.metricIds()) {
            if (FOLLOWER_METRIC_DEF_IDS.contains(metricId)) {
                leaderLoad.add(metricId, new MetricValues(aggregatedMetricValues.length()));
                continue;
            }
            leaderLoad.add(metricId, aggregatedMetricValues.valuesFor(metricId));
        }
        return leaderLoad;
    }

    public static boolean metadataChanged(MetadataClient.ClusterMetadata prev, MetadataClient.ClusterMetadata curr) {
        Cluster prevCluster = prev.cluster();
        Cluster currCluster = curr.cluster();
        HashSet prevNodeSet = new HashSet(prevCluster.nodes());
        if (prevNodeSet.size() != currCluster.nodes().size()) {
            return true;
        }
        prevNodeSet.removeAll(currCluster.nodes());
        if (!prevNodeSet.isEmpty()) {
            return true;
        }
        if (!prevCluster.topics().equals(currCluster.topics())) {
            return true;
        }
        for (String topic : prevCluster.topics()) {
            if (!prevCluster.partitionCountForTopic(topic).equals(currCluster.partitionCountForTopic(topic))) {
                return true;
            }
            for (PartitionInfo prevPartInfo : prevCluster.partitionsForTopic(topic)) {
                PartitionInfo currPartInfo;
                if (!MonitorUtils.leaderChanged(prevPartInfo, currPartInfo = currCluster.partition(new TopicPartition(prevPartInfo.topic(), prevPartInfo.partition()))) && !MonitorUtils.replicaListChanged(prevPartInfo, currPartInfo)) continue;
                return true;
            }
        }
        if (!prev.topicPlacements().equals(curr.topicPlacements())) {
            return true;
        }
        if (!prev.replicaExclusions().equals(curr.replicaExclusions())) {
            return true;
        }
        return !prev.reassigningPartitions().equals(curr.reassigningPartitions());
    }

    private static boolean leaderChanged(PartitionInfo prevPartInfo, PartitionInfo currPartInfo) {
        Node prevLeader = prevPartInfo.leader();
        Node currLeader = currPartInfo.leader();
        return !(prevLeader == null && currLeader == null || prevLeader != null && currLeader != null && prevLeader.id() == currLeader.id());
    }

    private static boolean replicaListChanged(PartitionInfo prevPartInfo, PartitionInfo currPartInfo) {
        int i;
        if (prevPartInfo.replicas().length != currPartInfo.replicas().length || prevPartInfo.observers().length != currPartInfo.observers().length) {
            return true;
        }
        for (i = 0; i < prevPartInfo.replicas().length; ++i) {
            if (prevPartInfo.replicas()[i].id() == currPartInfo.replicas()[i].id()) continue;
            return true;
        }
        for (i = 0; i < prevPartInfo.observers().length; ++i) {
            if (prevPartInfo.observers()[i].id() == currPartInfo.observers()[i].id()) continue;
            return true;
        }
        return false;
    }

    public static int totalNumPartitions(Cluster cluster) {
        int totalNumPartitions = 0;
        for (String topic : cluster.topics()) {
            totalNumPartitions += cluster.partitionCountForTopic(topic).intValue();
        }
        return totalNumPartitions;
    }

    static AggregatedMetricValues getAggregatedMetricValues(ValuesAndExtrapolations partitionValuesAndExtrapolations, ValuesAndExtrapolations replicaValuesAndExtrapolations, boolean isLeader) {
        AggregatedMetricValues partitionMetricValues = partitionValuesAndExtrapolations.metricValues();
        AggregatedMetricValues replicaMetricValues = replicaValuesAndExtrapolations.metricValues();
        AggregatedMetricValues replicaLoad = MonitorUtils.mergeMetricValues(partitionMetricValues, replicaMetricValues);
        replicaLoad = isLeader ? MonitorUtils.toLeaderMetricValues(replicaLoad) : MonitorUtils.toFollowerMetricValues(replicaLoad);
        return replicaLoad;
    }

    static AggregatedMetricValues mergeMetricValues(AggregatedMetricValues partitionMetricValues, AggregatedMetricValues replicaMetricValues) {
        short commonMetricDefId;
        KafkaMetricDef metricDef;
        AggregatedMetricValues mergedValues = new AggregatedMetricValues();
        for (short metricId : partitionMetricValues.metricIds()) {
            metricDef = KafkaMetricDef.partitionMetricDef().metricInfo(metricId).kafkaMetricDef();
            commonMetricDefId = KafkaMetricDef.commonMetricDefId(metricDef);
            mergedValues.add(commonMetricDefId, partitionMetricValues.valuesFor(metricId));
        }
        for (short metricId : replicaMetricValues.metricIds()) {
            metricDef = KafkaMetricDef.replicaMetricDef().metricInfo(metricId).kafkaMetricDef();
            commonMetricDefId = KafkaMetricDef.commonMetricDefId(metricDef);
            MetricValues currentMetricValues = replicaMetricValues.valuesFor(metricId);
            if (currentMetricValues.length() < mergedValues.length()) {
                LOG.warn("Replica metrics have fewer windows than partition metrics so we'll be filling up replica metric windows with 0 values.");
                double[] currentMetricValuesArr = currentMetricValues.doubleArray();
                double[] extendedArray = Utils.expandArrayFromTheBeginning(currentMetricValuesArr, mergedValues.length());
                currentMetricValues = new MetricValues(extendedArray.length);
                currentMetricValues.add(extendedArray);
            } else if (currentMetricValues.length() > mergedValues.length()) {
                String message = "Replica metrics have more windows than partition metrics and this is not supposed to happen.";
                LOG.error(message);
                throw new IllegalStateException(message);
            }
            mergedValues.add(commonMetricDefId, currentMetricValues);
        }
        return mergedValues;
    }

    public static String getRackHandleNull(Node node) {
        return node.rack() == null || node.rack().isEmpty() ? node.host() : node.rack();
    }

    static Set<Integer> brokersWithReplicas(Cluster cluster) {
        HashSet<Integer> allBrokers = new HashSet<Integer>();
        for (String topic : cluster.topics()) {
            for (PartitionInfo partition : cluster.partitionsForTopic(topic)) {
                Arrays.stream(partition.replicas()).map(Node::id).forEach(allBrokers::add);
            }
        }
        return allBrokers;
    }

    static Set<Integer> deadBrokersWithReplicas(Cluster cluster) {
        Set<Integer> brokersWithReplicas = MonitorUtils.brokersWithReplicas(cluster);
        cluster.nodes().forEach(node -> brokersWithReplicas.remove(node.id()));
        return brokersWithReplicas;
    }

    static Set<Integer> brokersWithOfflineReplicas(Cluster cluster) {
        HashSet<Integer> brokersWithOfflineReplicas = new HashSet<Integer>();
        for (String topic : cluster.topics()) {
            for (PartitionInfo partition : cluster.partitionsForTopic(topic)) {
                if (partition.leader() == null) continue;
                brokersWithOfflineReplicas.addAll(Arrays.stream(partition.offlineReplicas()).map(Node::id).collect(Collectors.toSet()));
            }
        }
        return brokersWithOfflineReplicas;
    }

    static Map<Integer, Broker.Strategy> consolidateBrokerStrategies(Cluster cluster, Set<Integer> ignoredBrokerIds, Map<Integer, Broker.Strategy> preExistingBrokerStatesById) {
        Map<Integer, Broker.Strategy> brokerStatesById = cluster.nodes().stream().collect(Collectors.toMap(Node::id, node -> Broker.Strategy.ALIVE));
        ignoredBrokerIds.forEach(ignoredBrokerId -> brokerStatesById.put((Integer)ignoredBrokerId, Broker.Strategy.IGNORE));
        MonitorUtils.deadBrokersWithReplicas(cluster).forEach(brokerId -> brokerStatesById.put((Integer)brokerId, Broker.Strategy.DEAD));
        for (Integer n : MonitorUtils.brokersWithOfflineReplicas(cluster)) {
            if (brokerStatesById.get(n) == Broker.Strategy.DEAD) continue;
            brokerStatesById.put(n, Broker.Strategy.BAD_DISKS);
        }
        for (Map.Entry entry : preExistingBrokerStatesById.entrySet()) {
            brokerStatesById.put((Integer)entry.getKey(), (Broker.Strategy)((Object)entry.getValue()));
        }
        return brokerStatesById;
    }

    static Map<TopicPartition, List<SampleExtrapolation>> partitionSampleExtrapolations(Map<PartitionEntity, ValuesAndExtrapolations> valuesAndExtrapolations) {
        HashMap<TopicPartition, List<SampleExtrapolation>> sampleExtrapolations = new HashMap<TopicPartition, List<SampleExtrapolation>>();
        for (Map.Entry<PartitionEntity, ValuesAndExtrapolations> entry : valuesAndExtrapolations.entrySet()) {
            TopicPartition tp = entry.getKey().tp();
            Map<Integer, Extrapolation> extrapolations = entry.getValue().extrapolations();
            if (extrapolations.isEmpty()) continue;
            List extrapolationForPartition = sampleExtrapolations.computeIfAbsent(tp, p -> new ArrayList());
            extrapolations.forEach((t, extrapolation) -> extrapolationForPartition.add(new SampleExtrapolation(t.intValue(), (Extrapolation)((Object)extrapolation))));
        }
        return sampleExtrapolations;
    }

    static Map<TopicPartition, Map<Integer, String>> getReplicaPlacementInfo(ClusterModel clusterModel, Cluster cluster, ConfluentAdmin adminClient, KafkaCruiseControlConfig config) {
        HashMap<TopicPartition, Map<Integer, String>> replicaPlacementInfo = new HashMap<TopicPartition, Map<Integer, String>>();
        Map logDirsByBrokerId = adminClient.describeLogDirs((Collection)cluster.nodes().stream().mapToInt(Node::id).boxed().collect(Collectors.toList()), (DescribeLogDirsOptions)new DescribeLogDirsOptions().timeoutMs(config.getInt("logdir.response.timeout.ms"))).descriptions();
        for (Map.Entry entry : logDirsByBrokerId.entrySet()) {
            Integer brokerId = (Integer)entry.getKey();
            try {
                ((Map)((KafkaFuture)entry.getValue()).get()).forEach((logdir, info) -> {
                    if (info.error() == null) {
                        for (Map.Entry e : info.replicaInfos().entrySet()) {
                            if (!((ReplicaInfo)e.getValue()).isFuture()) {
                                replicaPlacementInfo.putIfAbsent((TopicPartition)e.getKey(), new HashMap());
                                ((Map)replicaPlacementInfo.get(e.getKey())).put(brokerId, logdir);
                                continue;
                            }
                            LOG.trace("Topic partition {}'s replica is moving to {} on broker {}.", new Object[]{e.getKey(), logdir, brokerId});
                        }
                    } else {
                        clusterModel.broker(brokerId).disk((String)logdir).setState(Disk.State.DEAD);
                    }
                });
            }
            catch (InterruptedException | ExecutionException e) {
                throw new RuntimeException(String.format("Populating logdir information for broker %d encountered Exception %s.", entry.getKey(), e));
            }
        }
        return replicaPlacementInfo;
    }

    static void populatePartitionLoad(PartitionInfo partitionInfo, ClusterModel clusterModel, TopicPartition tp, ValuesAndExtrapolations partitionValues, Map<ReplicaEntity, ValuesAndExtrapolations> replicaValuesAndExtrapolations, Map<TopicPartition, Map<Integer, String>> replicaPlacementInfo, BrokerCapacityConfigResolver brokerCapacityConfigResolver) {
        for (int index = 0; index < partitionInfo.replicas().length; ++index) {
            Node replica = partitionInfo.replicas()[index];
            String rack = MonitorUtils.getRackHandleNull(replica);
            BrokerCapacityInfo brokerCapacity = brokerCapacityConfigResolver.capacityForBroker(rack, replica.host(), replica.id());
            clusterModel.handleDeadBroker(rack, replica.id(), brokerCapacity);
            if (partitionInfo.leader() == null) {
                LOG.warn("Detected offline partition {}-{}, skipping", (Object)partitionInfo.topic(), (Object)partitionInfo.partition());
                continue;
            }
            boolean isLeader = replica.id() == partitionInfo.leader().id();
            boolean isObserver = Arrays.stream(partitionInfo.observers()).anyMatch(observer -> observer.id() == replica.id());
            boolean isOffline = Arrays.stream(partitionInfo.offlineReplicas()).anyMatch(offlineReplica -> offlineReplica.id() == replica.id());
            ReplicaEntity replicaEntity = new ReplicaEntity(partitionInfo, replica.id(), isLeader);
            ValuesAndExtrapolations replicaValues = replicaValuesAndExtrapolations.get(replicaEntity);
            if (replicaValues == null) {
                LOG.debug("skipping replica {} creation in the cluster model", (Object)replicaEntity);
                continue;
            }
            String logdir = replicaPlacementInfo == null ? null : replicaPlacementInfo.get(tp).get(replica.id());
            clusterModel.createReplica(rack, replica.id(), tp, index, isLeader, isOffline, logdir, false, isObserver);
            clusterModel.setReplicaLoad(rack, replica.id(), tp, MonitorUtils.getAggregatedMetricValues(partitionValues, replicaValues, isLeader), partitionValues.windows());
        }
    }
}

