/*
 * Decompiled with CFR 0.152.
 */
package com.linkedin.kafka.cruisecontrol.analyzer.goals;

import com.linkedin.kafka.cruisecontrol.analyzer.ActionAcceptance;
import com.linkedin.kafka.cruisecontrol.analyzer.ActionType;
import com.linkedin.kafka.cruisecontrol.analyzer.AnalyzerUtils;
import com.linkedin.kafka.cruisecontrol.analyzer.BalancingConstraint;
import com.linkedin.kafka.cruisecontrol.analyzer.OptimizationOptions;
import com.linkedin.kafka.cruisecontrol.analyzer.PartitionBalancingAction;
import com.linkedin.kafka.cruisecontrol.analyzer.goals.AbstractGoal;
import com.linkedin.kafka.cruisecontrol.analyzer.goals.ChainReplicaFilter;
import com.linkedin.kafka.cruisecontrol.analyzer.goals.DistributionThresholdUtils;
import com.linkedin.kafka.cruisecontrol.analyzer.goals.Goal;
import com.linkedin.kafka.cruisecontrol.analyzer.goals.GoalUtils;
import com.linkedin.kafka.cruisecontrol.analyzer.goals.internals.BrokerResourceStats;
import com.linkedin.kafka.cruisecontrol.analyzer.goals.internals.CandidateBroker;
import com.linkedin.kafka.cruisecontrol.analyzer.goals.internals.DetailedProposal;
import com.linkedin.kafka.cruisecontrol.analyzer.goals.internals.ResourceDistributionStatsSnapshot;
import com.linkedin.kafka.cruisecontrol.analyzer.goals.metrics.OptimizationMetrics;
import com.linkedin.kafka.cruisecontrol.analyzer.goals.util.ResourceDistributionLogger;
import com.linkedin.kafka.cruisecontrol.common.Resource;
import com.linkedin.kafka.cruisecontrol.exception.OptimizationFailureException;
import com.linkedin.kafka.cruisecontrol.model.Broker;
import com.linkedin.kafka.cruisecontrol.model.ClusterModel;
import com.linkedin.kafka.cruisecontrol.model.Load;
import com.linkedin.kafka.cruisecontrol.model.Replica;
import com.linkedin.kafka.cruisecontrol.model.ReplicaSortFunctionFactory;
import com.linkedin.kafka.cruisecontrol.model.util.BrokerByResourceUtilizationComparator;
import com.linkedin.kafka.cruisecontrol.model.util.ClusterModelStatsByResourceUtilizationDeviationComparator;
import com.linkedin.kafka.cruisecontrol.model.util.ClusterModelStatsComparator;
import com.linkedin.kafka.cruisecontrol.model.util.ReplicaByResourceUtilizationComparator;
import com.linkedin.kafka.cruisecontrol.monitor.ModelCompletenessRequirements;
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.PriorityQueue;
import java.util.Set;
import java.util.TreeSet;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public abstract class ResourceDistributionAbstractGoal
extends AbstractGoal {
    private static final Logger LOG = LoggerFactory.getLogger(ResourceDistributionAbstractGoal.class);
    protected boolean fixOfflineReplicasOnly;
    protected Map<Integer, BrokerResourceStats> initialResourceDistribution;
    protected DistributionThresholdUtils.ResourcePercentageThresholds thresholds;
    protected boolean requireLessLoad;
    protected boolean requireMoreLoad;
    protected boolean moveImmigrantsOnly;

    public ResourceDistributionAbstractGoal() {
    }

    ResourceDistributionAbstractGoal(BalancingConstraint balancingConstraint) {
        this.balancingConstraint = balancingConstraint;
    }

    boolean fixOfflineReplicasOnly() {
        return this.fixOfflineReplicasOnly;
    }

    void setFixOfflineReplicasOnly() {
        this.fixOfflineReplicasOnly = true;
    }

    protected abstract Resource resource();

    protected boolean validatePercentages() {
        return true;
    }

    @Override
    public ActionAcceptance partitionActionAcceptance(PartitionBalancingAction action, ClusterModel clusterModel) {
        return ActionAcceptance.REPLICA_REJECT;
    }

    @Override
    public ClusterModelStatsComparator clusterModelStatsComparator() {
        return new ClusterModelStatsByResourceUtilizationDeviationComparator(this.name(), this.resource());
    }

    @Override
    public ModelCompletenessRequirements clusterModelCompletenessRequirements() {
        return new ModelCompletenessRequirements(Math.max(1, this.numWindows / 2), this.minMonitoredPartitionPercentage, false);
    }

    @Override
    public abstract String name();

    @Override
    public boolean isHardGoal() {
        return false;
    }

    @Override
    public boolean partitionActionSelfSatisfied(ClusterModel clusterModel, PartitionBalancingAction action) {
        return false;
    }

    @Override
    public void finish() {
        this.finished = true;
    }

    @Override
    protected void initGoalState(ClusterModel clusterModel, OptimizationOptions optimizationOptions, Optional<OptimizationMetrics> optimizationMetricsOpt) throws OptimizationFailureException {
        this.fixOfflineReplicasOnly = false;
        this.thresholds = DistributionThresholdUtils.ResourcePercentageThresholds.compute(clusterModel, optimizationOptions, this.balancingConstraint, this.resource(), this.validatePercentages());
        this.initialResourceDistribution = clusterModel.eligibleDestinationBrokers().stream().map(b -> new BrokerResourceStats(b.capacity().totalCapacityFor(this.resource()), b.load().expectedUtilizationFor(this.resource()), b.id(), this.resource())).collect(Collectors.toMap(brokerResourceStats -> brokerResourceStats.brokerId, brokerResourceStats -> brokerResourceStats));
        clusterModel.trackSortedReplicas(clusterModel.eligibleSourceOrDestinationBrokers(), this.sortName(), null, ReplicaSortFunctionFactory.deprioritizeOfflineReplicasThenImmigrants(), ReplicaSortFunctionFactory.sortByMetricResourceValue(this.resource()));
        ResourceDistributionLogger.Builder initLoggerBuilder = ResourceDistributionLogger.builder().goalName(this.name()).thresholds(this.thresholds).optimizationOptions(optimizationOptions);
        Optional<ResourceDistributionStatsSnapshot> statsSnapshotOpt = this.tryToComputeResourceDistributionStatsSnapshot(this.initialResourceDistribution.values());
        statsSnapshotOpt.ifPresent(initLoggerBuilder::statsSnapshot);
        initLoggerBuilder.build().logResourceDistribution();
        if (optimizationMetricsOpt.isPresent()) {
            if (statsSnapshotOpt.isPresent()) {
                OptimizationMetrics metrics = optimizationMetricsOpt.get();
                metrics.recordDistributionBalanceStats(this, statsSnapshotOpt.get());
            } else {
                LOG.warn("Will not be reporting metrics as part of evaluating whether to trigger the even-cluster load task for {} because a resource distribution stats snapshot could not be computed. Check the previous logs for more information and whether the cluster had the same capacities per broker.", this.getClass());
            }
        }
    }

    Optional<ResourceDistributionStatsSnapshot> tryToComputeResourceDistributionStatsSnapshot(Collection<BrokerResourceStats> resourceStats) {
        return ResourceDistributionStatsSnapshot.tryCompute(this.thresholds, resourceStats);
    }

    @Override
    protected void rebalanceForBroker(Broker broker, ClusterModel clusterModel, Set<Goal> optimizedGoals, OptimizationOptions optimizationOptions) throws OptimizationFailureException {
        int numOfflineReplicas = broker.currentOfflineReplicas().size();
        if (numOfflineReplicas == 0 && clusterModel.brokersOverThreshold(this.brokersToBalance(clusterModel), this.resource(), this.thresholds.lowUtilizationPercent()).isEmpty()) {
            return;
        }
        this.requireLessLoad = broker.isEligibleSource() && (numOfflineReplicas > 0 || !this.isLoadUnderBalanceUpperLimit(broker));
        this.requireMoreLoad = broker.isEligibleDestination() && !this.isLoadAboveBalanceLowerLimit(broker);
        this.moveImmigrantsOnly = false;
        if (broker.currentOfflineReplicas().isEmpty()) {
            if (!this.requireMoreLoad && !this.requireLessLoad) {
                return;
            }
            boolean bl = this.moveImmigrantsOnly = !clusterModel.selfHealingEligibleReplicas().isEmpty();
            if (this.moveImmigrantsOnly && this.requireLessLoad && broker.immigrantReplicas().isEmpty()) {
                return;
            }
        }
        this.doRebalance(broker, clusterModel, optimizedGoals, optimizationOptions);
    }

    protected abstract void doRebalance(Broker var1, ClusterModel var2, Set<Goal> var3, OptimizationOptions var4);

    protected void performLeadershipMovement(Broker broker, ClusterModel clusterModel, Set<Goal> optimizedGoals, OptimizationOptions optimizationOptions) {
        if (this.shouldTryLeadershipMovement(this.resource()) && (!this.fixOfflineReplicasOnly || broker.currentOfflineReplicas().isEmpty())) {
            if (this.requireLessLoad) {
                this.requireLessLoad = this.rebalanceByMovingLoadOut(broker, clusterModel, optimizedGoals, ActionType.LEADERSHIP_MOVEMENT, optimizationOptions);
            }
            if (this.requireMoreLoad) {
                this.requireMoreLoad = this.rebalanceByMovingLoadIn(broker, clusterModel, optimizedGoals, ActionType.LEADERSHIP_MOVEMENT, optimizationOptions, false);
            }
        }
    }

    protected void performReplicaMovement(Broker broker, ClusterModel clusterModel, Set<Goal> optimizedGoals, OptimizationOptions optimizationOptions) {
        if (this.requireLessLoad) {
            this.requireLessLoad = this.rebalanceByMovingLoadOut(broker, clusterModel, optimizedGoals, ActionType.INTER_BROKER_REPLICA_MOVEMENT, optimizationOptions);
        }
        if (this.requireMoreLoad) {
            this.requireMoreLoad = this.rebalanceByMovingLoadIn(broker, clusterModel, optimizedGoals, ActionType.INTER_BROKER_REPLICA_MOVEMENT, optimizationOptions, this.moveImmigrantsOnly);
        }
    }

    boolean rebalanceByMovingLoadOut(Broker broker, ClusterModel clusterModel, Set<Goal> optimizedGoals, ActionType actionType, OptimizationOptions optimizationOptions) {
        boolean balancingCompleted;
        if (broker.strategy() == Broker.Strategy.IGNORE) {
            throw new IllegalArgumentException("rebalanceByMovingLoadOut doesn't accept ignored broker as input.");
        }
        Set<String> excludedTopics = optimizationOptions.excludedTopics();
        TreeSet<Broker> candidateBrokers = new TreeSet<Broker>(BrokerByResourceUtilizationComparator.of(this.resource(), false));
        if (this.fixOfflineReplicasOnly) {
            candidateBrokers.addAll(clusterModel.eligibleDestinationBrokers());
        } else {
            candidateBrokers.addAll(clusterModel.brokersUnderThreshold(clusterModel.eligibleDestinationBrokers(), this.resource(), this.thresholds.balanceUpperPercent()));
        }
        List<Replica> replicasToMove = broker.trackedSortedReplicas(this.sortName()).reverselySortedReplicas();
        if (actionType == ActionType.LEADERSHIP_MOVEMENT) {
            replicasToMove = replicasToMove.stream().filter(Replica::isLeader).collect(Collectors.toList());
        } else if (!clusterModel.selfHealingEligibleReplicas().isEmpty() && broker.isAlive()) {
            replicasToMove = replicasToMove.stream().filter(r -> broker.currentOfflineReplicas().contains(r) || broker.immigrantReplicas().contains(r)).collect(Collectors.toList());
        }
        for (Replica replica : replicasToMove) {
            Broker destination2;
            TreeSet<Broker> eligibleBrokers;
            if (ResourceDistributionAbstractGoal.shouldExclude(replica, excludedTopics)) continue;
            double replicaLoad = replica.load().expectedUtilizationFor(this.resource());
            if (AnalyzerUtils.isEqual(replicaLoad, 0.0) && !replica.isCurrentOffline()) break;
            if (actionType == ActionType.LEADERSHIP_MOVEMENT) {
                eligibleBrokers = new TreeSet<Broker>(BrokerByResourceUtilizationComparator.of(this.resource(), false));
                clusterModel.partition(replica.topicPartition()).onlineFollowerBrokers().stream().filter(candidateBrokers::contains).forEach(eligibleBrokers::add);
            } else {
                eligibleBrokers = candidateBrokers;
            }
            double currentBrokerResourceUtilization = broker.load(this.resource()).expectedUtilizationFor(this.resource());
            DetailedProposal.DetailedReasonBuilder detailedReasonBuilder = destination -> String.format("Moving %.2f of %s load out from broker %d to %s in order to decrease the usage on Broker %d by %.2f - Current utilization: %.2f - Target utilization: %.2f", new Object[]{replicaLoad, this.resource(), broker.id(), destination, broker.id(), replicaLoad, currentBrokerResourceUtilization, this.thresholds.balanceUpperPercent()});
            Optional<DetailedProposal.Builder> detailedProposalOptional = Optional.empty();
            if (this.proposalTrackingOptions().isEnabled) {
                detailedProposalOptional = Optional.of(DetailedProposal.builder(replica.topicPartition(), broker.id(), replicaLoad, actionType));
            }
            if ((destination2 = this.maybeApplyBalancingAction(clusterModel, replica, eligibleBrokers, actionType, optimizedGoals, optimizationOptions, detailedReasonBuilder, detailedProposalOptional)) == null) continue;
            if ((!this.fixOfflineReplicasOnly || broker.currentOfflineReplicas().isEmpty()) && this.isRebalanceByMovingLoadOutCompleted(broker)) {
                LOG.debug("Successfully balanced {} for broker {} by moving {} out.", new Object[]{this.resource(), broker.id(), actionType});
                return false;
            }
            candidateBrokers.remove(destination2);
            if (!this.fixOfflineReplicasOnly && !AnalyzerUtils.isSmaller(GoalUtils.utilizationPercentage(destination2, this.resource()), this.thresholds.balanceUpperPercent())) continue;
            candidateBrokers.add(destination2);
        }
        if (balancingCompleted = broker.replicas().isEmpty()) {
            LOG.debug("Successfully balanced {} for broker {} by moving {} out.", new Object[]{this.resource(), broker.id(), actionType});
        }
        return !balancingCompleted;
    }

    protected abstract boolean isRebalanceByMovingLoadOutCompleted(Broker var1);

    boolean rebalanceByMovingLoadIn(Broker broker, ClusterModel clusterModel, Set<Goal> optimizedGoals, ActionType actionType, OptimizationOptions optimizationOptions, boolean moveImmigrantsOnly) {
        CandidateBroker cb;
        if (broker.strategy() == Broker.Strategy.IGNORE) {
            throw new IllegalArgumentException("rebalanceByMovingLoadIn doesn't accept ignored broker as input.");
        }
        if (!clusterModel.newBrokers().isEmpty() && !broker.isNew()) {
            return true;
        }
        Set<String> excludedTopics = optimizationOptions.excludedTopics();
        Set<Integer> excludedBrokersForLeadership = optimizationOptions.excludedBrokersForLeadership();
        Set<Integer> excludedBrokersForReplicaMove = optimizationOptions.excludedBrokersForReplicaMove();
        boolean moveFollowersOnly = excludedBrokersForLeadership.contains(broker.id());
        PriorityQueue<CandidateBroker> candidateBrokerPQ = new PriorityQueue<CandidateBroker>();
        for (Broker candidate : clusterModel.eligibleSourceBrokers()) {
            if (!AnalyzerUtils.isLarger(GoalUtils.utilizationPercentage(candidate, this.resource()), this.thresholds.meanUtilizationPercent())) continue;
            TreeSet<Replica> replicasToMoveIn = new TreeSet<Replica>(ReplicaByResourceUtilizationComparator.of(this.resource()));
            ChainReplicaFilter replicaFilter = new ChainReplicaFilter(replica -> !ResourceDistributionAbstractGoal.shouldExclude(replica, excludedTopics), replica -> AnalyzerUtils.isLarger(replica.load().expectedUtilizationFor(this.resource()), 0.0));
            Set<Replica> filteredReplicas = GoalUtils.filterReplicas(candidate, moveFollowersOnly, false, moveImmigrantsOnly, replicaFilter);
            replicasToMoveIn.addAll(filteredReplicas);
            CandidateBroker candidateBroker = new CandidateBroker(candidate, this.resource(), replicasToMoveIn, false, excludedBrokersForLeadership, excludedBrokersForReplicaMove);
            candidateBrokerPQ.add(candidateBroker);
        }
        block1: while ((actionType == ActionType.INTER_BROKER_REPLICA_MOVEMENT || actionType == ActionType.LEADERSHIP_MOVEMENT && broker.leaderReplicas().size() != broker.replicas().size()) && (cb = (CandidateBroker)candidateBrokerPQ.poll()) != null) {
            Iterator iterator = cb.replicas().iterator();
            while (iterator.hasNext()) {
                double resourceUtilizationRatioFromNextSource;
                double resourceUtilizationRatioFromCurrentSource;
                CandidateBroker nextSource;
                boolean appliedBalancingAction;
                Replica replica2 = (Replica)iterator.next();
                double replicaResourceUtilization = replica2.load().expectedUtilizationFor(this.resource());
                double currentBrokerResourceUtilization = broker.load(this.resource()).expectedUtilizationFor(this.resource());
                DetailedProposal.DetailedReasonBuilder detailedReasonBuilder = destination -> String.format("Moving %.2f of %s load in from broker %d to %s in order to increase the usage on Broker %d by %.2f - Current utilization: %.2f - Target utilization: %.2f", new Object[]{replicaResourceUtilization, this.resource(), cb.broker().id(), destination, broker.id(), replicaResourceUtilization, currentBrokerResourceUtilization, this.thresholds.balanceLowerPercent()});
                Optional<DetailedProposal.Builder> detailedProposalOptional = Optional.empty();
                if (this.proposalTrackingOptions().isEnabled) {
                    detailedProposalOptional = Optional.of(DetailedProposal.builder(replica2.topicPartition(), broker.id(), replicaResourceUtilization, actionType));
                }
                if (!(appliedBalancingAction = this.maybeApplyBalancingAction(clusterModel, replica2, Collections.singletonList(broker), actionType, optimizedGoals, optimizationOptions, detailedReasonBuilder, detailedProposalOptional) != null)) continue;
                if (this.isRebalanceByMovingLoadInCompleted(broker)) {
                    LOG.debug("Successfully balanced {} for broker {} by moving {} in.", new Object[]{this.resource(), broker.id(), actionType});
                    return false;
                }
                if (actionType == ActionType.INTER_BROKER_REPLICA_MOVEMENT) {
                    iterator.remove();
                }
                if ((nextSource = (CandidateBroker)candidateBrokerPQ.peek()) == null || !AnalyzerUtils.isSmaller(resourceUtilizationRatioFromCurrentSource = GoalUtils.utilizationPercentage(cb.broker(), this.resource()), resourceUtilizationRatioFromNextSource = GoalUtils.utilizationPercentage(nextSource.broker(), this.resource()))) continue;
                candidateBrokerPQ.add(cb);
                continue block1;
            }
        }
        return true;
    }

    protected abstract boolean isRebalanceByMovingLoadInCompleted(Broker var1);

    protected boolean isLoadAboveBalanceLowerLimit(Broker broker) {
        return this.isLoadAboveBalanceLowerLimitAfterChange(null, broker, ChangeType.ADD);
    }

    protected boolean isLoadUnderBalanceUpperLimit(Broker broker) {
        return this.isLoadUnderBalanceUpperLimitAfterChange(null, broker, ChangeType.REMOVE);
    }

    protected boolean isLoadAboveBalanceLowerLimitAfterChange(Load load, Broker broker, ChangeType changeType) {
        boolean isBrokerAboveLowerLimit;
        double utilizationDelta = load == null ? 0.0 : load.expectedUtilizationFor(this.resource());
        double brokerBalanceLowerLimit = broker.capacity().totalCapacityFor(this.resource()) * this.thresholds.balanceLowerPercent();
        double brokerUtilization = broker.load().expectedUtilizationFor(this.resource());
        brokerUtilization = changeType == ChangeType.ADD ? brokerUtilization + utilizationDelta : brokerUtilization - utilizationDelta;
        boolean bl = isBrokerAboveLowerLimit = !AnalyzerUtils.isSmaller(brokerUtilization, brokerBalanceLowerLimit);
        if (this.resource().isHostResource()) {
            double hostBalanceLowerLimit = broker.host().capacity().totalCapacityFor(this.resource()) * this.thresholds.balanceLowerPercent();
            double hostUtilization = broker.host().load().expectedUtilizationFor(this.resource());
            hostUtilization = changeType == ChangeType.ADD ? hostUtilization + utilizationDelta : hostUtilization - utilizationDelta;
            boolean isHostAboveLowerLimit = !AnalyzerUtils.isSmaller(hostUtilization, hostBalanceLowerLimit);
            return isHostAboveLowerLimit || isBrokerAboveLowerLimit;
        }
        return isBrokerAboveLowerLimit;
    }

    protected boolean isLoadUnderBalanceUpperLimitAfterChange(Load load, Broker broker, ChangeType changeType) {
        boolean isBrokerUnderUpperLimit;
        double utilizationDelta = load == null ? 0.0 : load.expectedUtilizationFor(this.resource());
        double brokerBalanceUpperLimit = broker.capacity().totalCapacityFor(this.resource()) * this.thresholds.balanceUpperPercent();
        double brokerUtilization = broker.load().expectedUtilizationFor(this.resource());
        brokerUtilization = changeType == ChangeType.ADD ? brokerUtilization + utilizationDelta : brokerUtilization - utilizationDelta;
        boolean bl = isBrokerUnderUpperLimit = !AnalyzerUtils.isLarger(brokerUtilization, brokerBalanceUpperLimit);
        if (this.resource().isHostResource()) {
            double hostBalanceUpperLimit = broker.host().capacity().totalCapacityFor(this.resource()) * this.thresholds.balanceUpperPercent();
            double hostUtilization = broker.host().load().expectedUtilizationFor(this.resource());
            hostUtilization = changeType == ChangeType.ADD ? hostUtilization + utilizationDelta : hostUtilization - utilizationDelta;
            boolean isHostUnderUpperLimit = !AnalyzerUtils.isLarger(hostUtilization, hostBalanceUpperLimit);
            return isHostUnderUpperLimit || isBrokerUnderUpperLimit;
        }
        return isBrokerUnderUpperLimit;
    }

    protected String sortName() {
        return this.name() + "-" + this.resource().name() + "-ALL";
    }

    protected static enum ChangeType {
        ADD,
        REMOVE;

    }
}

