/*
 * Decompiled with CFR 0.152.
 */
package com.linkedin.kafka.cruisecontrol.analyzer.goals;

import com.linkedin.kafka.cruisecontrol.analyzer.ActionAcceptance;
import com.linkedin.kafka.cruisecontrol.analyzer.BalancingConstraint;
import com.linkedin.kafka.cruisecontrol.analyzer.OptimizationOptions;
import com.linkedin.kafka.cruisecontrol.analyzer.PartitionBalancingAction;
import com.linkedin.kafka.cruisecontrol.analyzer.ReplicaBalancingAction;
import com.linkedin.kafka.cruisecontrol.analyzer.goals.AbstractGoal;
import com.linkedin.kafka.cruisecontrol.analyzer.goals.Goal;
import com.linkedin.kafka.cruisecontrol.analyzer.goals.metrics.OptimizationMetrics;
import com.linkedin.kafka.cruisecontrol.common.Resource;
import com.linkedin.kafka.cruisecontrol.exception.OptimizationFailureException;
import com.linkedin.kafka.cruisecontrol.model.Broker;
import com.linkedin.kafka.cruisecontrol.model.ClusterModel;
import com.linkedin.kafka.cruisecontrol.model.Disk;
import com.linkedin.kafka.cruisecontrol.model.Replica;
import com.linkedin.kafka.cruisecontrol.model.ReplicaSortFunctionFactory;
import com.linkedin.kafka.cruisecontrol.monitor.ModelCompletenessRequirements;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.List;
import java.util.Optional;
import java.util.Set;
import java.util.SortedSet;
import java.util.TreeSet;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class IntraBrokerDiskCapacityGoal
extends AbstractGoal {
    private static final Logger LOG = LoggerFactory.getLogger(IntraBrokerDiskCapacityGoal.class);
    private static final int MIN_NUM_VALID_WINDOWS = 1;
    private static final Resource RESOURCE = Resource.DISK;

    public IntraBrokerDiskCapacityGoal() {
    }

    IntraBrokerDiskCapacityGoal(BalancingConstraint constraint) {
        this.balancingConstraint = constraint;
    }

    @Override
    public boolean isHardGoal() {
        return true;
    }

    @Override
    protected void initGoalState(ClusterModel clusterModel, OptimizationOptions optimizationOptions, Optional<OptimizationMetrics> optimizationMetricsOpt) throws OptimizationFailureException {
        for (Broker broker : clusterModel.aliveBrokers()) {
            double existingUtilization = broker.load().expectedUtilizationFor(RESOURCE);
            double allowedCapacity = this.balancingConstraint.allowedCapacityForBroker(RESOURCE, broker.capacity());
            if (!(allowedCapacity < existingUtilization)) continue;
            throw new OptimizationFailureException("Insufficient disk capacity at broker " + broker.id() + ", existing broker utilization " + existingUtilization + " exceeds allowed capacity " + allowedCapacity);
        }
    }

    @Override
    protected SortedSet<Broker> brokersToBalance(ClusterModel clusterModel) {
        return new TreeSet<Broker>(clusterModel.aliveBrokers());
    }

    @Override
    public ActionAcceptance replicaActionAcceptance(ReplicaBalancingAction action, ClusterModel clusterModel) {
        if (action.sourceBrokerLogdir() == null || action.destinationBrokerLogdir() == null) {
            throw new IllegalArgumentException(this.getClass().getSimpleName() + " does not support balancing action not specifying logdir.");
        }
        Replica sourceReplica = clusterModel.broker(action.sourceBrokerId()).replica(action.topicPartition());
        Disk destinationDisk = clusterModel.broker(action.destinationBrokerId()).disk(action.destinationBrokerLogdir());
        switch (action.balancingAction()) {
            case INTRA_BROKER_REPLICA_SWAP: {
                Replica destinationReplica = clusterModel.broker(action.destinationBrokerId()).replica(action.destinationTopicPartition());
                return this.isSwapAcceptableForCapacity(sourceReplica, destinationReplica) ? ActionAcceptance.ACCEPT : ActionAcceptance.REPLICA_REJECT;
            }
            case INTRA_BROKER_REPLICA_MOVEMENT: {
                return this.isMovementAcceptableForCapacity(sourceReplica, destinationDisk) ? ActionAcceptance.ACCEPT : ActionAcceptance.REPLICA_REJECT;
            }
            case LEADERSHIP_MOVEMENT: {
                return ActionAcceptance.ACCEPT;
            }
        }
        throw new IllegalArgumentException("Unsupported balancing action " + (Object)((Object)action.balancingAction()) + " is provided.");
    }

    @Override
    public ActionAcceptance partitionActionAcceptance(PartitionBalancingAction action, ClusterModel clusterModel) {
        throw new IllegalArgumentException("Unsupported balancing action " + (Object)((Object)action.balancingAction()) + " is provided.");
    }

    @Override
    public boolean replicaActionSelfSatisfied(ClusterModel clusterModel, ReplicaBalancingAction action) {
        Replica sourceReplica = clusterModel.broker(action.sourceBrokerId()).replica(action.topicPartition());
        Disk destinationDisk = clusterModel.broker(action.destinationBrokerId()).disk(action.destinationBrokerLogdir());
        return sourceReplica.load().expectedUtilizationFor(RESOURCE) > 0.0 && this.isMovementAcceptableForCapacity(sourceReplica, destinationDisk);
    }

    @Override
    public boolean partitionActionSelfSatisfied(ClusterModel clusterModel, PartitionBalancingAction action) {
        return false;
    }

    @Override
    protected void rebalanceForBroker(Broker broker, ClusterModel clusterModel, Set<Goal> optimizedGoals, OptimizationOptions optimizationOptions) {
        LOG.debug("balancing broker {}, optimized goals = {}.", (Object)broker, optimizedGoals);
        List disksOverUtilized = broker.disks().stream().filter(Disk::isAlive).filter(this::isUtilizationOverLimit).collect(Collectors.toList());
        if (disksOverUtilized.isEmpty()) {
            return;
        }
        Set<String> excludedTopics = optimizationOptions.excludedTopics();
        ArrayList<Disk> candidateDisks = new ArrayList<Disk>(broker.disks());
        candidateDisks.removeAll(disksOverUtilized);
        candidateDisks.sort(new Comparator<Disk>(){

            @Override
            public int compare(Disk disk1, Disk disk2) {
                double allowanceForDisk1 = IntraBrokerDiskCapacityGoal.this.balancingConstraint.allowedCapacityForBroker(RESOURCE, disk1.capacity()) - disk1.utilization();
                double allowanceForDisk2 = IntraBrokerDiskCapacityGoal.this.balancingConstraint.allowedCapacityForBroker(RESOURCE, disk2.capacity()) - disk2.utilization();
                return Double.valueOf(allowanceForDisk2 - allowanceForDisk1).intValue();
            }
        });
        for (Disk disk : disksOverUtilized) {
            disk.trackSortedReplicas(this.name(), ReplicaSortFunctionFactory.selectOnlineReplicas(), ReplicaSortFunctionFactory.deprioritizeDiskImmigrants(), ReplicaSortFunctionFactory.sortByMetricResourceValue(RESOURCE));
            for (Replica replica : disk.trackedSortedReplicas(this.name()).reverselySortedReplicas()) {
                if (IntraBrokerDiskCapacityGoal.shouldExclude(replica, excludedTopics)) continue;
                Disk d = this.maybeMoveReplicaBetweenDisks(clusterModel, replica, candidateDisks, optimizedGoals);
                if (d == null) {
                    LOG.debug("Failed to move replica {} to any disk {} in broker {}", new Object[]{replica, candidateDisks, replica.broker()});
                }
                if (this.isUtilizationOverLimit(disk)) continue;
                break;
            }
            disk.untrackSortedReplicas(this.name());
        }
    }

    @Override
    protected void updateGoalState(ClusterModel clusterModel, Set<String> excludedTopics) throws OptimizationFailureException {
        for (Broker broker : this.brokersToBalance(clusterModel)) {
            for (Disk disk : broker.disks()) {
                if (!disk.isAlive() || !this.isUtilizationOverLimit(disk)) continue;
                throw new OptimizationFailureException(String.format("Optimization for goal %s failed because utilization for disk %s on broker %d is still above capacity limit.", this.name(), disk, broker.id()));
            }
        }
        this.finish();
    }

    @Override
    public ModelCompletenessRequirements clusterModelCompletenessRequirements() {
        return new ModelCompletenessRequirements(1, this.minMonitoredPartitionPercentage, true);
    }

    @Override
    public String name() {
        return this.getClass().getSimpleName();
    }

    @Override
    public void finish() {
        this.finished = true;
    }

    private boolean isUtilizationOverLimit(Disk disk) {
        return disk.utilization() > this.balancingConstraint.allowedCapacityForBroker(RESOURCE, disk.capacity());
    }

    private boolean isMovementAcceptableForCapacity(Replica sourceReplica, Disk destinationDisk) {
        double replicaUtilization = sourceReplica.load().expectedUtilizationFor(RESOURCE);
        return this.isUtilizationUnderLimitAfterAddingLoad(destinationDisk, replicaUtilization);
    }

    private boolean isSwapAcceptableForCapacity(Replica sourceReplica, Replica destinationReplica) {
        double sourceReplicaUtilization = sourceReplica.load().expectedUtilizationFor(RESOURCE);
        double destinationReplicaUtilization = destinationReplica.load().expectedUtilizationFor(RESOURCE);
        double sourceUtilizationDelta = destinationReplicaUtilization - sourceReplicaUtilization;
        return sourceUtilizationDelta > 0.0 ? this.isUtilizationUnderLimitAfterAddingLoad(sourceReplica.disk(), sourceUtilizationDelta) : this.isUtilizationUnderLimitAfterAddingLoad(destinationReplica.disk(), -sourceUtilizationDelta);
    }

    private boolean isUtilizationUnderLimitAfterAddingLoad(Disk destinationDisk, double utilizationToAdd) {
        double capacityLimit = this.balancingConstraint.allowedCapacityForBroker(RESOURCE, destinationDisk.capacity());
        return destinationDisk.utilization() + utilizationToAdd < capacityLimit;
    }
}

