/*
 * Decompiled with CFR 0.152.
 */
package com.linkedin.kafka.cruisecontrol.analyzer.goals;

import com.linkedin.kafka.cruisecontrol.analyzer.ActionAcceptance;
import com.linkedin.kafka.cruisecontrol.analyzer.ActionType;
import com.linkedin.kafka.cruisecontrol.analyzer.AnalyzerUtils;
import com.linkedin.kafka.cruisecontrol.analyzer.BalancingConstraint;
import com.linkedin.kafka.cruisecontrol.analyzer.OptimizationOptions;
import com.linkedin.kafka.cruisecontrol.analyzer.ReplicaBalancingAction;
import com.linkedin.kafka.cruisecontrol.analyzer.goals.Goal;
import com.linkedin.kafka.cruisecontrol.analyzer.goals.GoalUtils;
import com.linkedin.kafka.cruisecontrol.analyzer.goals.ResourceDistributionAbstractGoal;
import com.linkedin.kafka.cruisecontrol.analyzer.goals.internals.BrokerResourceStats;
import com.linkedin.kafka.cruisecontrol.analyzer.goals.internals.IncrementalResourceDistributionStatsSnapshot;
import com.linkedin.kafka.cruisecontrol.analyzer.goals.metrics.OptimizationMetrics;
import com.linkedin.kafka.cruisecontrol.config.KafkaCruiseControlConfig;
import com.linkedin.kafka.cruisecontrol.exception.OptimizationFailureException;
import com.linkedin.kafka.cruisecontrol.model.Broker;
import com.linkedin.kafka.cruisecontrol.model.Capacity;
import com.linkedin.kafka.cruisecontrol.model.ClusterModel;
import com.linkedin.kafka.cruisecontrol.model.Load;
import com.linkedin.kafka.cruisecontrol.model.Replica;
import com.linkedin.kafka.cruisecontrol.model.util.BrokerByResourceUtilizationComparator;
import com.linkedin.kafka.cruisecontrol.monitor.ModelCompletenessRequirements;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.SortedSet;
import java.util.TreeSet;
import java.util.stream.Collectors;
import org.apache.kafka.common.TopicPartition;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public abstract class IncrementalResourceDistributionGoal
extends ResourceDistributionAbstractGoal {
    private static final Logger LOG = LoggerFactory.getLogger(IncrementalResourceDistributionGoal.class);
    private double incrementalStepRatio;
    private double incrementalLowerBound;
    private IncrementalResourceDistributionStatsSnapshot statsSnapshot;

    public IncrementalResourceDistributionGoal() {
    }

    IncrementalResourceDistributionGoal(BalancingConstraint constraint) {
        super(constraint);
    }

    double incrementalStepRatio() {
        return this.incrementalStepRatio;
    }

    double incrementalLowerBound() {
        return this.incrementalLowerBound;
    }

    @Override
    public void configure(Map<String, ?> configs) {
        super.configure(configs);
        KafkaCruiseControlConfig parsedConfig = new KafkaCruiseControlConfig(configs, false);
        this.incrementalStepRatio = parsedConfig.getDouble("incremental.balancing.step.ratio");
        this.incrementalLowerBound = parsedConfig.getDouble("incremental.balancing.lower.bound");
    }

    @Override
    public ModelCompletenessRequirements clusterModelCompletenessRequirements() {
        return new ModelCompletenessRequirements(1, this.minMonitoredPartitionPercentage, false);
    }

    @Override
    protected void initGoalState(ClusterModel clusterModel, OptimizationOptions optimizationOptions, Optional<OptimizationMetrics> optimizationMetricsOpt) throws OptimizationFailureException {
        super.initGoalState(clusterModel, optimizationOptions, optimizationMetricsOpt);
        this.statsSnapshot = new IncrementalResourceDistributionStatsSnapshot(clusterModel.eligibleSourceOrDestinationBrokers().stream().collect(Collectors.toMap(Broker::id, broker -> this.calculateIncrementalBalancingPercentageThreshold((Broker)broker, this.incrementalStepRatio))), this.incrementalLowerBound, this.incrementalStepRatio, this.thresholds.meanUtilizationPercent());
        if (optimizationMetricsOpt.isPresent()) {
            OptimizationMetrics metrics = optimizationMetricsOpt.get();
            metrics.recordIncrementalDistributionBalanceStats(this, this.statsSnapshot);
        }
        LOG.debug("Incremental balancing stats {}", (Object)this.statsSnapshot);
    }

    private double calculateIncrementalBalancingPercentageThreshold(Broker broker, double incrementalStepRatio) {
        double initialResourceUtilizationRatio = Optional.ofNullable(this.initialResourceDistribution.get(broker.id())).map(BrokerResourceStats::utilizationRatio).orElse(1.0);
        return Math.abs(this.thresholds.meanUtilizationPercent() - initialResourceUtilizationRatio) * incrementalStepRatio;
    }

    @Override
    public ActionAcceptance replicaActionAcceptance(ReplicaBalancingAction action, ClusterModel clusterModel) {
        Replica replica = clusterModel.broker(action.sourceBrokerId()).replica(action.topicPartition());
        if (replica.isCurrentOffline()) {
            if (action.balancingAction() == ActionType.INTER_BROKER_REPLICA_MOVEMENT) {
                return ActionAcceptance.ACCEPT;
            }
            return ActionAcceptance.REPLICA_REJECT;
        }
        if (this.isGettingMoreBalanced(clusterModel, action)) {
            return ActionAcceptance.ACCEPT;
        }
        return ActionAcceptance.REPLICA_REJECT;
    }

    @Override
    protected SortedSet<Broker> brokersToBalance(ClusterModel clusterModel) {
        SortedSet<Broker> brokersToBalance = clusterModel.newBrokers().isEmpty() ? clusterModel.eligibleSourceOrDestinationBrokers() : clusterModel.newBrokers();
        TreeSet<Broker> brokersToBalanceByResourceUtilizationInReverse = new TreeSet<Broker>(BrokerByResourceUtilizationComparator.of(this.resource(), true));
        brokersToBalanceByResourceUtilizationInReverse.addAll(brokersToBalance);
        return brokersToBalanceByResourceUtilizationInReverse;
    }

    @Override
    public boolean replicaActionSelfSatisfied(ClusterModel clusterModel, ReplicaBalancingAction action) {
        Replica replica = clusterModel.broker(action.sourceBrokerId()).replica(action.topicPartition());
        if (replica.isCurrentOffline()) {
            return action.balancingAction() == ActionType.INTER_BROKER_REPLICA_MOVEMENT;
        }
        return this.isGettingMoreBalanced(clusterModel, action);
    }

    @Override
    protected void updateGoalState(ClusterModel clusterModel, Set<String> excludedTopics) throws OptimizationFailureException {
        GoalUtils.ensureNoOfflineReplicas(clusterModel, this.name());
        GoalUtils.ensureReplicasMoveOffBrokersWithBadDisks(clusterModel, this.name());
        this.finish();
        clusterModel.untrackSortedReplicas(this.sortName());
    }

    @Override
    protected boolean isRebalanceByMovingLoadOutCompleted(Broker broker) {
        return this.isIncrementalBalancingThresholdReached(broker);
    }

    @Override
    protected boolean isRebalanceByMovingLoadInCompleted(Broker broker) {
        return this.isIncrementalBalancingThresholdReached(broker);
    }

    @Override
    protected void doRebalance(Broker broker, ClusterModel clusterModel, Set<Goal> optimizedGoals, OptimizationOptions optimizationOptions) {
        this.performReplicaMovement(broker, clusterModel, optimizedGoals, optimizationOptions);
        this.performLeadershipMovement(broker, clusterModel, optimizedGoals, optimizationOptions);
    }

    private boolean isIncrementalBalancingThresholdReached(Broker broker) {
        double initialResourceUtilizationRatio = Optional.ofNullable(this.initialResourceDistribution.get(broker.id())).map(BrokerResourceStats::utilizationRatio).orElse(1.0);
        double currentResourceUtilizationRatio = GoalUtils.utilizationPercentage(broker, this.resource());
        double incrementalBalancingThreshold = this.statsSnapshot.desiredIncrementalImprovementPercent(broker.id());
        double currentBalancingRoundUtilizationDelta = Math.abs(currentResourceUtilizationRatio - initialResourceUtilizationRatio);
        LOG.trace("Broker {}, currentResourceUtilizationDelta: {}, incrementalBalancingThreshold: {}", new Object[]{broker.id(), currentBalancingRoundUtilizationDelta, incrementalBalancingThreshold});
        boolean reachedDelta = AnalyzerUtils.isLarger(currentBalancingRoundUtilizationDelta, incrementalBalancingThreshold) || AnalyzerUtils.isEqual(currentBalancingRoundUtilizationDelta, incrementalBalancingThreshold);
        return reachedDelta;
    }

    private boolean isGettingMoreBalanced(ClusterModel clusterModel, ReplicaBalancingAction action) {
        Broker sourceBroker = clusterModel.broker(action.sourceBrokerId());
        Broker destinationBroker = clusterModel.broker(action.destinationBrokerId());
        TopicPartition tp = action.topicPartition();
        ActionType actionType = action.balancingAction();
        if (actionType != ActionType.INTER_BROKER_REPLICA_MOVEMENT && actionType != ActionType.LEADERSHIP_MOVEMENT) {
            throw new UnsupportedOperationException(String.format("Balancing action type %s is not supported by %s", new Object[]{actionType, this.getClass().getSimpleName()}));
        }
        LOG.trace("Check whether replica balancing action {}, produces a more balanced ClusterModel.", (Object)action);
        Capacity sourceCapacity = sourceBroker.capacity(this.resource());
        Capacity destinationCapacity = destinationBroker.capacity(this.resource());
        Load sourceLoadBefore = sourceBroker.load(this.resource());
        Load destinationLoadBefore = destinationBroker.load(this.resource());
        Load sourceReplicaLoad = sourceBroker.replica(tp).load();
        Load destinationReplicaLoad = Optional.ofNullable(destinationBroker.replica(tp)).map(Replica::load).orElse(null);
        LOG.trace("Calculate the utilization deviation sum before applying the balancing action...");
        double utilizationDeviationSumBefore = this.calculateUtilizationDeviationSum(sourceLoadBefore, sourceCapacity, destinationLoadBefore, destinationCapacity);
        Load sourceLoadAfter = Load.builder().base(sourceLoadBefore).subtractLoad(sourceReplicaLoad).addLoad(destinationReplicaLoad).build();
        Load destinationLoadAfter = Load.builder().base(destinationLoadBefore).addLoad(sourceReplicaLoad).subtractLoad(destinationReplicaLoad).build();
        LOG.trace("Calculate the utilization deviation sum after applying the balancing action...");
        double utilizationDeviationSumAfter = this.calculateUtilizationDeviationSum(sourceLoadAfter, sourceCapacity, destinationLoadAfter, destinationCapacity);
        return AnalyzerUtils.isLarger(utilizationDeviationSumBefore, utilizationDeviationSumAfter);
    }

    private double calculateUtilizationDeviationSum(Load sourceLoad, Capacity sourceCapacity, Load destinationLoad, Capacity destinationCapacity) {
        double sourceUtilization = sourceLoad.expectedUtilizationFor(this.resource());
        double destinationUtilization = destinationLoad.expectedUtilizationFor(this.resource());
        double sourceResourceCapacity = sourceCapacity.totalCapacityFor(this.resource());
        double destinationResourceCapacity = destinationCapacity.totalCapacityFor(this.resource());
        double sourceMeanUtilization = this.thresholds.meanUtilizationPercent() * sourceResourceCapacity;
        double destinationMeanUtilization = this.thresholds.meanUtilizationPercent() * destinationResourceCapacity;
        double utilizationDeviationSum = Math.abs(sourceUtilization - sourceMeanUtilization) + Math.abs(destinationUtilization - destinationMeanUtilization);
        LOG.trace("Source util: {}, source capacity: {}, source mean utilization: {}, destination util: {}, destination capacity: {}, destination mean utilization: {}, utilization deviation sum: {}", new Object[]{sourceUtilization, sourceResourceCapacity, sourceMeanUtilization, destinationUtilization, destinationResourceCapacity, destinationMeanUtilization, utilizationDeviationSum});
        return utilizationDeviationSum;
    }
}

