/*
 * Decompiled with CFR 0.152.
 */
package com.linkedin.kafka.cruisecontrol.analyzer.goals;

import com.linkedin.kafka.cruisecontrol.analyzer.ActionAcceptance;
import com.linkedin.kafka.cruisecontrol.analyzer.ActionType;
import com.linkedin.kafka.cruisecontrol.analyzer.AnalyzerUtils;
import com.linkedin.kafka.cruisecontrol.analyzer.BalancingConstraint;
import com.linkedin.kafka.cruisecontrol.analyzer.OptimizationOptions;
import com.linkedin.kafka.cruisecontrol.analyzer.PartitionBalancingAction;
import com.linkedin.kafka.cruisecontrol.analyzer.ReplicaBalancingAction;
import com.linkedin.kafka.cruisecontrol.analyzer.goals.AbstractGoal;
import com.linkedin.kafka.cruisecontrol.analyzer.goals.Goal;
import com.linkedin.kafka.cruisecontrol.analyzer.goals.metrics.OptimizationMetrics;
import com.linkedin.kafka.cruisecontrol.common.Resource;
import com.linkedin.kafka.cruisecontrol.common.Statistic;
import com.linkedin.kafka.cruisecontrol.model.Broker;
import com.linkedin.kafka.cruisecontrol.model.ClusterModel;
import com.linkedin.kafka.cruisecontrol.model.ClusterModelStats;
import com.linkedin.kafka.cruisecontrol.model.Replica;
import com.linkedin.kafka.cruisecontrol.model.util.ClusterModelStatsComparator;
import com.linkedin.kafka.cruisecontrol.monitor.ModelCompletenessRequirements;
import java.util.Comparator;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Optional;
import java.util.Set;
import java.util.SortedSet;
import java.util.TreeSet;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class LeaderBytesInDistributionGoal
extends AbstractGoal {
    private static final Logger LOG = LoggerFactory.getLogger(LeaderBytesInDistributionGoal.class);
    private double lowUtilizationThreshold;
    private double meanLeaderBytesIn;
    private Set<Integer> overLimitBrokerIds;

    public LeaderBytesInDistributionGoal() {
    }

    LeaderBytesInDistributionGoal(BalancingConstraint balancingConstraint) {
        this.balancingConstraint = balancingConstraint;
    }

    @Override
    public ActionAcceptance replicaActionAcceptance(ReplicaBalancingAction action, ClusterModel clusterModel) {
        double newDestLeaderBytesIn;
        Replica sourceReplica = clusterModel.broker(action.sourceBrokerId()).replica(action.topicPartition());
        Broker destinationBroker = clusterModel.broker(action.destinationBrokerId());
        this.initMeanLeaderBytesIn(clusterModel);
        if (!sourceReplica.isLeader()) {
            switch (action.balancingAction()) {
                case INTER_BROKER_REPLICA_SWAP: {
                    if (destinationBroker.replica(action.destinationTopicPartition()).isLeader()) break;
                    return ActionAcceptance.ACCEPT;
                }
                case INTER_BROKER_REPLICA_MOVEMENT: {
                    return ActionAcceptance.ACCEPT;
                }
                case LEADERSHIP_MOVEMENT: {
                    throw new IllegalStateException("Attempt to move leadership from the follower.");
                }
                default: {
                    throw new IllegalArgumentException("Unsupported balancing action " + (Object)((Object)action.balancingAction()) + " is provided.");
                }
            }
        }
        double sourceReplicaUtilization = sourceReplica.load().expectedUtilizationFor(Resource.NW_IN);
        switch (action.balancingAction()) {
            case INTER_BROKER_REPLICA_SWAP: {
                double destinationReplicaUtilization = destinationBroker.replica(action.destinationTopicPartition()).load().expectedUtilizationFor(Resource.NW_IN);
                newDestLeaderBytesIn = destinationBroker.leadershipLoadForNwResources().expectedUtilizationFor(Resource.NW_IN) + sourceReplicaUtilization - destinationReplicaUtilization;
                Broker sourceBroker = clusterModel.broker(action.sourceBrokerId());
                double newSourceLeaderBytesIn = sourceBroker.leadershipLoadForNwResources().expectedUtilizationFor(Resource.NW_IN) + destinationReplicaUtilization - sourceReplicaUtilization;
                if (!(newSourceLeaderBytesIn > this.balanceThreshold(clusterModel, sourceBroker.id()))) break;
                return ActionAcceptance.REPLICA_REJECT;
            }
            case INTER_BROKER_REPLICA_MOVEMENT: 
            case LEADERSHIP_MOVEMENT: {
                newDestLeaderBytesIn = destinationBroker.leadershipLoadForNwResources().expectedUtilizationFor(Resource.NW_IN) + sourceReplicaUtilization;
                break;
            }
            default: {
                throw new IllegalArgumentException("Unsupported balancing action " + (Object)((Object)action.balancingAction()) + " is provided.");
            }
        }
        return !(newDestLeaderBytesIn > this.balanceThreshold(clusterModel, destinationBroker.id())) ? ActionAcceptance.ACCEPT : ActionAcceptance.REPLICA_REJECT;
    }

    @Override
    public ActionAcceptance partitionActionAcceptance(PartitionBalancingAction action, ClusterModel clusterModel) {
        return ActionAcceptance.REPLICA_REJECT;
    }

    @Override
    public ClusterModelStatsComparator clusterModelStatsComparator() {
        return new LeaderBytesInDistributionGoalStatsComparator();
    }

    @Override
    public ModelCompletenessRequirements clusterModelCompletenessRequirements() {
        return new ModelCompletenessRequirements(Math.max(1, this.numWindows / 2), this.minMonitoredPartitionPercentage, false);
    }

    @Override
    public String name() {
        return LeaderBytesInDistributionGoal.class.getSimpleName();
    }

    @Override
    public boolean isHardGoal() {
        return false;
    }

    @Override
    protected SortedSet<Broker> brokersToBalance(ClusterModel clusterModel) {
        SortedSet brokersToBalance = clusterModel.eligibleSourceBrokers().stream().filter(b -> b.leadershipLoadForNwResources().expectedUtilizationFor(Resource.NW_IN) > this.balanceThreshold(clusterModel, b.id())).collect(Collectors.toCollection(TreeSet::new));
        return brokersToBalance;
    }

    @Override
    public boolean replicaActionSelfSatisfied(ClusterModel clusterModel, ReplicaBalancingAction action) {
        if (action.balancingAction() != ActionType.LEADERSHIP_MOVEMENT) {
            throw new IllegalStateException("Found balancing action " + (Object)((Object)action.balancingAction()) + " but expected leadership movement.");
        }
        return this.actionAcceptance(action, clusterModel) == ActionAcceptance.ACCEPT;
    }

    @Override
    public boolean partitionActionSelfSatisfied(ClusterModel clusterModel, PartitionBalancingAction action) {
        return false;
    }

    @Override
    protected void initGoalState(ClusterModel clusterModel, OptimizationOptions optimizationOptions, Optional<OptimizationMetrics> optimizationMetricsOpt) {
        this.meanLeaderBytesIn = 0.0;
        this.overLimitBrokerIds = new HashSet<Integer>();
        this.lowUtilizationThreshold = this.balancingConstraint.lowUtilizationThreshold(Resource.NW_IN, optimizationOptions);
    }

    @Override
    public void finish() {
        this.finished = true;
        this.overLimitBrokerIds.clear();
    }

    @Override
    protected void updateGoalState(ClusterModel clusterModel, Set<String> excludedTopics) {
        if (!this.overLimitBrokerIds.isEmpty()) {
            LOG.debug("There were still {} brokers over the upper leader bytes in balance threshold.", (Object)this.overLimitBrokerIds.size());
            this.optimizationResultBuilder.markUnsuccessfulOptimization();
        }
        this.finish();
    }

    @Override
    protected void rebalanceForBroker(Broker broker, ClusterModel clusterModel, Set<Goal> optimizedGoals, OptimizationOptions optimizationOptions) {
        double balanceThreshold = this.balanceThreshold(clusterModel, broker.id());
        if (broker.leadershipLoadForNwResources().expectedUtilizationFor(Resource.NW_IN) < balanceThreshold) {
            return;
        }
        Set<String> excludedTopics = optimizationOptions.excludedTopics();
        List leaderReplicasSortedByBytesIn = broker.replicas().stream().filter(Replica::isLeader).filter(r -> !LeaderBytesInDistributionGoal.shouldExclude(r, excludedTopics)).sorted((a, b) -> Double.compare(b.load().expectedUtilizationFor(Resource.NW_IN), a.load().expectedUtilizationFor(Resource.NW_IN))).collect(Collectors.toList());
        boolean overThreshold = true;
        Iterator leaderReplicaIt = leaderReplicasSortedByBytesIn.iterator();
        while (overThreshold && leaderReplicaIt.hasNext()) {
            Replica leaderReplica = (Replica)leaderReplicaIt.next();
            List<Replica> onlineFollowers = clusterModel.partition(leaderReplica.topicPartition()).onlineFollowers();
            List<Broker> eligibleBrokers = onlineFollowers.stream().map(Replica::broker).filter(Broker::isEligibleDestination).sorted(Comparator.comparingDouble(a -> a.leadershipLoadForNwResources().expectedUtilizationFor(Resource.NW_IN))).collect(Collectors.toList());
            this.maybeApplyBalancingAction(clusterModel, leaderReplica, eligibleBrokers, ActionType.LEADERSHIP_MOVEMENT, optimizedGoals, optimizationOptions, Optional.empty());
            overThreshold = broker.leadershipLoadForNwResources().expectedUtilizationFor(Resource.NW_IN) > balanceThreshold;
        }
        if (overThreshold) {
            this.overLimitBrokerIds.add(broker.id());
        }
    }

    private void initMeanLeaderBytesIn(ClusterModel clusterModel) {
        if (this.meanLeaderBytesIn == 0.0) {
            this.meanLeaderBytesIn = this.computeMeanLeaderBytesIn(clusterModel);
        }
    }

    double computeMeanLeaderBytesIn(ClusterModel clusterModel) {
        SortedSet<Broker> currentBrokerSet = clusterModel.eligibleSourceBrokers();
        int brokerCount = clusterModel.eligibleDestinationBrokers().size();
        double accumulator = 0.0;
        for (Broker broker : currentBrokerSet) {
            accumulator += broker.leadershipLoadForNwResources().expectedUtilizationFor(Resource.NW_IN);
        }
        return accumulator / (double)brokerCount;
    }

    private double balanceThreshold(ClusterModel clusterModel, int brokerId) {
        this.initMeanLeaderBytesIn(clusterModel);
        return Math.max(this.meanLeaderBytesIn * this.balancingConstraint.resourceBalancePercentage(Resource.NW_IN), this.lowUtilizationThreshold * clusterModel.broker(brokerId).capacity().totalCapacityFor(Resource.NW_IN));
    }

    private class LeaderBytesInDistributionGoalStatsComparator
    implements ClusterModelStatsComparator {
        private String reasonForLastNegativeResult;

        private LeaderBytesInDistributionGoalStatsComparator() {
        }

        @Override
        public int compare(ClusterModelStats stats1, ClusterModelStats stats2) {
            double meanPreLeaderBytesIn = stats1.resourceUtilizationStats().get((Object)Statistic.AVG).get((Object)Resource.NW_IN);
            double threshold = meanPreLeaderBytesIn * LeaderBytesInDistributionGoal.this.balancingConstraint.resourceBalancePercentage(Resource.NW_IN);
            if (stats1.resourceUtilizationStats().get((Object)Statistic.MAX).get((Object)Resource.NW_IN) <= threshold) {
                return 1;
            }
            double variance1 = stats1.resourceUtilizationStats().get((Object)Statistic.ST_DEV).get((Object)Resource.NW_IN);
            double variance2 = stats2.resourceUtilizationStats().get((Object)Statistic.ST_DEV).get((Object)Resource.NW_IN);
            int result = AnalyzerUtils.compare(Math.sqrt(variance2), Math.sqrt(variance1), Resource.NW_IN);
            if (result < 0) {
                this.reasonForLastNegativeResult = String.format("Violated leader bytes in balancing. preVariance: %.3f postVariance: %.3f.", variance2, variance1);
            }
            return result;
        }

        @Override
        public String explainLastComparison() {
            return this.reasonForLastNegativeResult;
        }
    }
}

