/*
 * Decompiled with CFR 0.152.
 */
package com.linkedin.kafka.cruisecontrol.analyzer.goals;

import com.linkedin.kafka.cruisecontrol.analyzer.ActionAcceptance;
import com.linkedin.kafka.cruisecontrol.analyzer.ActionType;
import com.linkedin.kafka.cruisecontrol.analyzer.BalancingConstraint;
import com.linkedin.kafka.cruisecontrol.analyzer.OptimizationOptions;
import com.linkedin.kafka.cruisecontrol.analyzer.PartitionBalancingAction;
import com.linkedin.kafka.cruisecontrol.analyzer.ReplicaBalancingAction;
import com.linkedin.kafka.cruisecontrol.analyzer.goals.AbstractGoal;
import com.linkedin.kafka.cruisecontrol.analyzer.goals.EntityCombinator;
import com.linkedin.kafka.cruisecontrol.analyzer.goals.Goal;
import com.linkedin.kafka.cruisecontrol.analyzer.goals.GoalUtils;
import com.linkedin.kafka.cruisecontrol.analyzer.goals.RebalanceStep;
import com.linkedin.kafka.cruisecontrol.analyzer.goals.internals.CapacityStatsSnapshot;
import com.linkedin.kafka.cruisecontrol.analyzer.goals.metrics.OptimizationMetrics;
import com.linkedin.kafka.cruisecontrol.common.Resource;
import com.linkedin.kafka.cruisecontrol.exception.OptimizationFailureException;
import com.linkedin.kafka.cruisecontrol.model.Broker;
import com.linkedin.kafka.cruisecontrol.model.Capacity;
import com.linkedin.kafka.cruisecontrol.model.Cell;
import com.linkedin.kafka.cruisecontrol.model.ClusterModel;
import com.linkedin.kafka.cruisecontrol.model.Load;
import com.linkedin.kafka.cruisecontrol.model.Replica;
import com.linkedin.kafka.cruisecontrol.model.ReplicaSortFunctionFactory;
import com.linkedin.kafka.cruisecontrol.model.ReplicaWrapper;
import com.linkedin.kafka.cruisecontrol.monitor.ModelCompletenessRequirements;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.SortedSet;
import java.util.stream.Collectors;
import org.apache.kafka.common.TopicPartition;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public abstract class CapacityGoal
extends AbstractGoal {
    private static final Logger LOG = LoggerFactory.getLogger(CapacityGoal.class);

    public CapacityGoal() {
    }

    @Override
    public boolean isHardGoal() {
        return true;
    }

    CapacityGoal(BalancingConstraint constraint) {
        this.balancingConstraint = constraint;
    }

    protected abstract Resource resource();

    @Override
    public ActionAcceptance replicaActionAcceptance(ReplicaBalancingAction action, ClusterModel clusterModel) {
        Replica sourceReplica = clusterModel.broker(action.sourceBrokerId()).replica(action.topicPartition());
        Broker destinationBroker = clusterModel.broker(action.destinationBrokerId());
        switch (action.balancingAction()) {
            case INTER_BROKER_REPLICA_SWAP: {
                Replica destinationReplica = destinationBroker.replica(action.destinationTopicPartition());
                return this.isSwapAcceptableForCapacity(sourceReplica, destinationReplica) ? ActionAcceptance.ACCEPT : ActionAcceptance.REPLICA_REJECT;
            }
            case INTER_BROKER_REPLICA_MOVEMENT: 
            case LEADERSHIP_MOVEMENT: {
                return this.isMovementAcceptableForCapacity(sourceReplica, destinationBroker) ? ActionAcceptance.ACCEPT : ActionAcceptance.REPLICA_REJECT;
            }
        }
        throw new IllegalArgumentException("Unsupported balancing action " + (Object)((Object)action.balancingAction()) + " is provided.");
    }

    @Override
    public ActionAcceptance partitionActionAcceptance(PartitionBalancingAction action, ClusterModel clusterModel) {
        boolean validPartitionMove = action.replicaMoves().entrySet().stream().allMatch(replicaMove -> this.isMovementAcceptableForCapacity((Replica)replicaMove.getKey(), (Broker)replicaMove.getValue()));
        return validPartitionMove ? ActionAcceptance.ACCEPT : ActionAcceptance.REPLICA_REJECT;
    }

    @Override
    public ModelCompletenessRequirements clusterModelCompletenessRequirements() {
        return new ModelCompletenessRequirements(1, this.minMonitoredPartitionPercentage, true);
    }

    @Override
    public abstract String name();

    @Override
    public boolean replicaActionSelfSatisfied(ClusterModel clusterModel, ReplicaBalancingAction action) {
        Replica sourceReplica = clusterModel.broker(action.sourceBrokerId()).replica(action.topicPartition());
        Broker destinationBroker = clusterModel.broker(action.destinationBrokerId());
        return this.isMovementAcceptableForCapacity(sourceReplica, destinationBroker);
    }

    @Override
    public boolean partitionActionSelfSatisfied(ClusterModel clusterModel, PartitionBalancingAction action) {
        return true;
    }

    @Override
    protected SortedSet<Broker> brokersToBalance(ClusterModel clusterModel) {
        return clusterModel.eligibleSourceBrokers();
    }

    @Override
    protected void initGoalState(ClusterModel clusterModel, OptimizationOptions optimizationOptions, Optional<OptimizationMetrics> optimizationMetricsOpt) throws OptimizationFailureException {
        Load recentClusterLoad = clusterModel.load();
        this.ensureUtilizationUnderCapacityForBrokers(clusterModel.ignoredBrokers());
        double existingUtilization = recentClusterLoad.expectedUtilizationFor(this.resource());
        double allowedCapacity = clusterModel.aliveBrokers().stream().map(b -> this.balancingConstraint.allowedCapacityForBroker(this.resource(), b.capacity())).mapToDouble(Double::doubleValue).sum();
        if (allowedCapacity < existingUtilization) {
            double totalCapacity = clusterModel.aliveBrokers().stream().map(b -> b.capacity().totalCapacityFor(this.resource())).mapToDouble(Double::doubleValue).sum();
            throw new OptimizationFailureException(String.format("[%s] Insufficient healthy cluster capacity for resource: %s existing cluster utilization %f%s allowed capacity %f%s (total capacity %f%s).", new Object[]{this.name(), this.resource(), existingUtilization, this.resource().unit(), allowedCapacity, this.resource().unit(), totalCapacity, this.resource().unit()}));
        }
        this.ensureEligibleUtilizationUnderEligibleCapacity(clusterModel);
        Map<Integer, List<TopicPartition>> hotPartitionsByBroker = this.initClusterModel(clusterModel);
        optimizationMetricsOpt.ifPresent(optimizationMetrics -> {
            int totalNumHotPartitions = hotPartitionsByBroker.values().stream().mapToInt(List::size).sum();
            this.maybeRegisterMetrics((OptimizationMetrics)optimizationMetrics, clusterModel, totalNumHotPartitions);
        });
    }

    private void maybeRegisterMetrics(OptimizationMetrics optimizationMetrics, ClusterModel clusterModel, int numHotPartitions) {
        Map<Integer, Double> capacityByBroker = clusterModel.aliveBrokers().stream().collect(Collectors.toMap(Broker::id, v -> v.capacity(this.resource()).totalCapacityFor(this.resource())));
        Optional<Double> avgCapacity = GoalUtils.validateEvenBrokerResourceCapacities(capacityByBroker, this.resource());
        if (!avgCapacity.isPresent()) {
            LOG.warn("Will not be reporting metrics as part of evaluating whether to trigger the even-cluster load task for {} because a capacity stats snapshot could not be computed. Check the previous logs for more information, most notably whether the cluster had the same capacities per broker.", this.getClass());
            return;
        }
        double allowedCapacityLimit = this.balancingConstraint.allowedCapacityForBroker(this.resource(), avgCapacity.get());
        CapacityStatsSnapshot capacityStatsSnapshot = new CapacityStatsSnapshot(allowedCapacityLimit, this.resource(), numHotPartitions);
        optimizationMetrics.recordCapacityStats(this, capacityStatsSnapshot);
    }

    private void ensureEligibleUtilizationUnderEligibleCapacity(ClusterModel clusterModel) throws OptimizationFailureException {
        Resource resource = this.resource();
        double totalEligibleUtilization = 0.0;
        double totalEligibleCapacity = 0.0;
        for (Broker broker : clusterModel.eligibleSourceBrokers()) {
            totalEligibleUtilization += broker.load().expectedUtilizationFor(resource);
            if (!broker.isEligibleDestination()) continue;
            Capacity capacity = broker.capacity(resource);
            double eligibleCapacity = this.balancingConstraint.allowedCapacityForBroker(resource, capacity);
            totalEligibleCapacity += eligibleCapacity;
        }
        if (totalEligibleUtilization > totalEligibleCapacity) {
            throw new OptimizationFailureException(String.format("Optimization for goal %s failed because the utilization from eligible source brokers for resource %s is %f which is above the eligible capacity limit %f for the cluster (eligible destination brokers).", this.name(), resource.resource(), totalEligibleUtilization, totalEligibleCapacity));
        }
    }

    Map<Integer, List<TopicPartition>> initClusterModel(ClusterModel clusterModel) {
        clusterModel.trackSortedReplicas(this.sortName(), null, ReplicaSortFunctionFactory.deprioritizeOfflineReplicasThenImmigrants(), ReplicaSortFunctionFactory.sortByMetricResourceValue(this.resource()));
        clusterModel.trackSortedReplicas(this.sortNameByLeader(), ReplicaSortFunctionFactory.selectLeaders(), ReplicaSortFunctionFactory.deprioritizeImmigrants(), ReplicaSortFunctionFactory.sortByMetricResourceValue(this.resource()));
        return this.analyzeHotPartitions(clusterModel);
    }

    @Override
    protected void updateGoalState(ClusterModel clusterModel, Set<String> excludedTopics) throws OptimizationFailureException {
        try {
            this.ensureUtilizationUnderCapacityForBrokers(clusterModel.allBrokers());
            GoalUtils.ensureNoOfflineReplicas(clusterModel, this.name());
            GoalUtils.ensureReplicasMoveOffBrokersWithBadDisks(clusterModel, this.name());
            this.finish();
        }
        finally {
            clusterModel.untrackSortedReplicas(this.sortName());
            clusterModel.untrackSortedReplicas(this.sortNameByLeader());
        }
    }

    @Override
    public void finish() {
        this.finished = true;
    }

    private void ensureUtilizationUnderCapacityForBrokers(Set<Broker> brokers) throws OptimizationFailureException {
        Resource resource = this.resource();
        for (Broker broker : brokers) {
            double capacityLimit;
            double utilization;
            if (resource.isHostResource()) {
                utilization = broker.host().load().expectedUtilizationFor(resource);
                capacityLimit = this.balancingConstraint.allowedCapacityForBroker(resource, broker.host().capacity());
                if (!broker.host().replicas().isEmpty() && utilization > capacityLimit) {
                    throw new OptimizationFailureException(String.format("Optimization for goal %s failed because %s utilization for host %s is %f which is above capacity limit %f.", new Object[]{this.name(), resource, broker.host().name(), utilization, capacityLimit}));
                }
            }
            if (!resource.isBrokerResource()) continue;
            utilization = broker.load().expectedUtilizationFor(resource);
            capacityLimit = this.balancingConstraint.allowedCapacityForBroker(resource, broker.capacity());
            if (broker.replicas().isEmpty() || !(utilization > capacityLimit)) continue;
            throw new OptimizationFailureException(String.format("Optimization for goal %s failed because %s utilization for broker %d is %f which is above capacity limit %f.", new Object[]{this.name(), resource, broker.id(), utilization, capacityLimit}));
        }
    }

    @Override
    protected void rebalanceForBroker(Broker broker, ClusterModel clusterModel, Set<Goal> optimizedGoals, OptimizationOptions optimizationOptions) throws OptimizationFailureException {
        LOG.debug("balancing broker {}, optimized goals = {}", (Object)broker, optimizedGoals);
        Resource currentResource = this.resource();
        double brokerCapacityLimit = this.balancingConstraint.allowedCapacityForBroker(currentResource, broker.capacity());
        double brokerDesiredCapacityLimit = this.balancingConstraint.desiredCapacityForBroker(currentResource, broker.capacity());
        double hostCapacityLimit = this.balancingConstraint.allowedCapacityForBroker(currentResource, broker.host().capacity());
        double hostDesiredCapacityLimit = this.balancingConstraint.desiredCapacityForBroker(currentResource, broker.host().capacity());
        Set<String> excludedTopics = optimizationOptions.excludedTopics();
        RebalanceContext context = new RebalanceContext(broker, currentResource, brokerCapacityLimit, hostCapacityLimit, brokerDesiredCapacityLimit, hostDesiredCapacityLimit, clusterModel, excludedTopics, optimizedGoals, optimizationOptions);
        RebalanceStep rebalanceStep = new CheckShouldRebalance(context).onFailureThen(new LeadershipMovementRebalance(context)).onFailureThen(new ReplicaMovementRebalance(context)).onFailureThen(new PartitionMovementRebalance(context));
        rebalanceStep.balance();
        boolean isUtilizationOverLimit = this.isUtilizationOverLimit(broker, brokerCapacityLimit, hostCapacityLimit);
        this.postSanityCheck(isUtilizationOverLimit, broker, brokerCapacityLimit, hostCapacityLimit);
    }

    private void postSanityCheck(boolean utilizationOverLimit, Broker broker, double brokerCapacityLimit, double hostCapacityLimit) throws OptimizationFailureException {
        if (utilizationOverLimit) {
            Resource currentResource = this.resource();
            if (!currentResource.isHostResource()) {
                throw new OptimizationFailureException(String.format("[%s] Violated capacity limit of %f via broker utilization of %f with broker %d for resource %s.", new Object[]{this.name(), brokerCapacityLimit, broker.load().expectedUtilizationFor(currentResource), broker.id(), currentResource}));
            }
            throw new OptimizationFailureException(String.format("[%s] Violated capacity limit of %f via host utilization of %f with hostname %s for resource %s.", new Object[]{this.name(), hostCapacityLimit, broker.host().load().expectedUtilizationFor(currentResource), broker.host().name(), currentResource}));
        }
        if (!broker.currentOfflineReplicas().isEmpty()) {
            throw new OptimizationFailureException("Failed to remove offline replicas from broker " + broker.id() + ".");
        }
    }

    private String sortName() {
        return this.name() + "-" + (Object)((Object)this.resource()) + "-ALL";
    }

    private String sortNameByLeader() {
        return this.name() + "-" + (Object)((Object)this.resource()) + "-LEADER";
    }

    private Map<Integer, List<TopicPartition>> analyzeHotPartitions(ClusterModel clusterModel) {
        HashMap<Integer, List<TopicPartition>> hotPartitionsByBroker = new HashMap<Integer, List<TopicPartition>>();
        LinkedHashMap<TopicPartition, Double> hotPartitionsWithCorrespondingUsages = new LinkedHashMap<TopicPartition, Double>();
        block0: for (Broker broker : clusterModel.aliveBrokers()) {
            double minHotPartitionResourceUsage = this.balancingConstraint.hotPartitionUtilizationThreshold() * broker.capacity().totalCapacityFor(this.resource());
            Iterator<ReplicaWrapper> iterator = broker.trackedSortedReplicas(this.sortName()).reverselySortedIterator();
            while (iterator.hasNext()) {
                ReplicaWrapper replicaWrapper = iterator.next();
                Replica replica = replicaWrapper.replica();
                double avgResourceUsage = replicaWrapper.score();
                if (!(avgResourceUsage > minHotPartitionResourceUsage)) continue block0;
                if (!hotPartitionsByBroker.containsKey(replica.broker().id())) {
                    hotPartitionsByBroker.put(replica.broker().id(), new ArrayList());
                }
                ((List)hotPartitionsByBroker.get(replica.broker().id())).add(replica.topicPartition());
                hotPartitionsWithCorrespondingUsages.put(replica.topicPartition(), avgResourceUsage);
            }
        }
        if (!hotPartitionsWithCorrespondingUsages.isEmpty()) {
            int hotPartitionsCount = hotPartitionsWithCorrespondingUsages.keySet().size();
            LOG.warn("Found {} hot partitions with over {}% broker {} resource usage - their resource usage is {} on brokers {}", new Object[]{hotPartitionsCount, this.balancingConstraint.hotPartitionUtilizationThreshold() * 100.0, this.resource(), hotPartitionsWithCorrespondingUsages, hotPartitionsByBroker});
        }
        return hotPartitionsByBroker;
    }

    private boolean isUtilizationOverLimit(Broker broker, double brokerCapacityLimit, double hostCapacityLimit) {
        double utilization;
        Resource resource = this.resource();
        if (!broker.host().replicas().isEmpty() && resource.isHostResource() && (utilization = broker.host().load().expectedUtilizationFor(resource)) > hostCapacityLimit) {
            return true;
        }
        if (!broker.replicas().isEmpty() && resource.isBrokerResource()) {
            utilization = broker.load().expectedUtilizationFor(resource);
            return utilization > brokerCapacityLimit;
        }
        return false;
    }

    private boolean isMovementAcceptableForCapacity(Replica sourceReplica, Broker destinationBroker) {
        double replicaUtilization = sourceReplica.load().expectedUtilizationFor(this.resource());
        return this.isUtilizationUnderLimitAfterAddingLoad(destinationBroker, replicaUtilization);
    }

    private boolean isSwapAcceptableForCapacity(Replica sourceReplica, Replica destinationReplica) {
        double sourceReplicaUtilization = sourceReplica.load().expectedUtilizationFor(this.resource());
        double destinationReplicaUtilization = destinationReplica.load().expectedUtilizationFor(this.resource());
        double sourceUtilizationDelta = destinationReplicaUtilization - sourceReplicaUtilization;
        return sourceUtilizationDelta > 0.0 ? this.isUtilizationUnderLimitAfterAddingLoad(sourceReplica.broker(), sourceUtilizationDelta) : this.isUtilizationUnderLimitAfterAddingLoad(destinationReplica.broker(), -sourceUtilizationDelta);
    }

    private boolean isUtilizationUnderLimitAfterAddingLoad(Broker destinationBroker, double replicaUtilization) {
        double capacityLimit;
        double utilization;
        Resource resource = this.resource();
        if (resource.isHostResource() && (utilization = destinationBroker.host().load().expectedUtilizationFor(resource)) + replicaUtilization >= (capacityLimit = this.balancingConstraint.allowedCapacityForBroker(resource, destinationBroker.host().capacity()))) {
            return false;
        }
        if (resource.isBrokerResource()) {
            utilization = destinationBroker.load().expectedUtilizationFor(resource);
            return utilization + replicaUtilization < (capacityLimit = this.balancingConstraint.allowedCapacityForBroker(resource, destinationBroker.capacity()));
        }
        return true;
    }

    public List<Broker> eligibleBrokersReplicaMove(ClusterModel clusterModel, Resource resource) {
        ClusterModel.CapacityLimitProvider capacityLimitProvider = capacity -> this.balancingConstraint.allowedCapacityForBroker(resource, capacity);
        return clusterModel.brokersUnderCapacityLimit(clusterModel.eligibleDestinationBrokers(), resource, capacityLimitProvider).stream().filter(Broker::isEligibleDestination).sorted((o1, o2) -> {
            double expectedBrokerLoad1 = o1.load().expectedUtilizationFor(resource);
            double expectedBrokerLoad2 = o2.load().expectedUtilizationFor(resource);
            int hostComparison = 0;
            if (resource.isHostResource()) {
                double expectedHostLoad1 = o1.host().load().expectedUtilizationFor(resource);
                double expectedHostLoad2 = o2.host().load().expectedUtilizationFor(resource);
                hostComparison = Double.compare(expectedHostLoad1, expectedHostLoad2);
            }
            return hostComparison == 0 ? Double.compare(expectedBrokerLoad1, expectedBrokerLoad2) : hostComparison;
        }).collect(Collectors.toList());
    }

    protected boolean shouldExcludeForReplicaMove(Replica replica) {
        return false;
    }

    static class RebalanceContext {
        private final Broker broker;
        private final Resource resource;
        private final double brokerCapacityLimit;
        private final double hostCapacityLimit;
        private double brokerDesiredCapacityLimit;
        private double hostDesiredCapacityLimit;
        private final ClusterModel clusterModel;
        private final Set<String> excludedTopics;
        private final Set<Goal> optimizedGoals;
        private final OptimizationOptions optimizationOptions;

        RebalanceContext(Broker broker, Resource resource, double brokerCapacityLimit, double hostCapacityLimit, double brokerDesiredCapacityLimit, double hostDesiredCapacityLimit, ClusterModel clusterModel, Set<String> excludedTopics, Set<Goal> optimizedGoals, OptimizationOptions optimizationOptions) {
            this.broker = broker;
            this.resource = resource;
            this.brokerCapacityLimit = brokerCapacityLimit;
            this.hostCapacityLimit = hostCapacityLimit;
            this.brokerDesiredCapacityLimit = brokerDesiredCapacityLimit;
            this.hostDesiredCapacityLimit = hostDesiredCapacityLimit;
            if (brokerDesiredCapacityLimit > brokerCapacityLimit || hostDesiredCapacityLimit > hostCapacityLimit) {
                throw new IllegalArgumentException(String.format("Desired {broker, host} capacity limit (%f, %f) cannot be greater than the allowed {broker, host} capacity limit (%f, %f).", brokerDesiredCapacityLimit, hostDesiredCapacityLimit, brokerCapacityLimit, hostCapacityLimit));
            }
            if (brokerDesiredCapacityLimit < 0.0 && brokerCapacityLimit != -1.0 || hostDesiredCapacityLimit < 0.0 && hostCapacityLimit != -1.0) {
                throw new IllegalArgumentException(String.format("Desired {broker, host} capacity limit (%f, %f) cannot be negative.", brokerDesiredCapacityLimit, hostDesiredCapacityLimit));
            }
            this.clusterModel = clusterModel;
            this.excludedTopics = excludedTopics;
            this.optimizedGoals = optimizedGoals;
            this.optimizationOptions = optimizationOptions;
        }

        public Broker broker() {
            return this.broker;
        }

        public Resource resource() {
            return this.resource;
        }

        public double brokerCapacityLimit() {
            return this.brokerCapacityLimit;
        }

        public double brokerDesiredCapacityLimit() {
            return this.brokerDesiredCapacityLimit;
        }

        public double hostCapacityLimit() {
            return this.hostCapacityLimit;
        }

        public double hostDesiredCapacityLimit() {
            return this.hostDesiredCapacityLimit;
        }

        public ClusterModel clusterModel() {
            return this.clusterModel;
        }

        public Set<String> excludedTopics() {
            return this.excludedTopics;
        }

        public Set<Goal> optimizedGoals() {
            return this.optimizedGoals;
        }

        public OptimizationOptions optimizationOptions() {
            return this.optimizationOptions;
        }
    }

    class PartitionMovementRebalance
    extends AbstractDesiredCapacityGoalRebalanceStep {
        PartitionMovementRebalance(RebalanceContext context) {
            super(context);
        }

        @Override
        public boolean doBalance() {
            if (this.context.clusterModel().skipInterCellBalancing()) {
                return this.isBrokerBalanced();
            }
            this.rebalanceByPartitionMove(this.context.broker());
            return this.isBrokerBalanced();
        }

        private List<Replica> replicasInDescendingSize(Collection<Replica> replicas) {
            ArrayList<Replica> replicasList = new ArrayList<Replica>(replicas);
            replicasList.sort((replica1, replica2) -> {
                double replica1Utilization = replica1.load().expectedUtilizationFor(CapacityGoal.this.resource());
                double replica2Utilization = replica2.load().expectedUtilizationFor(CapacityGoal.this.resource());
                return Double.compare(replica2Utilization, replica1Utilization);
            });
            return replicasList;
        }

        private boolean movePartition(Replica replica) {
            TopicPartition topicPartition = replica.topicPartition();
            double maxReplicaResource = this.context.clusterModel.partition(topicPartition).replicas().stream().mapToDouble(r -> r.load().expectedUtilizationFor(CapacityGoal.this.resource())).max().orElseThrow(() -> new IllegalStateException(String.format("CapacityGoal cannot find maxReplicaResource since there is no replica of the topic partition %s in the ClusterModel.", topicPartition)));
            ArrayList<Cell> candidateCells = new ArrayList<Cell>(this.context.clusterModel.cells());
            candidateCells.remove(replica.broker().cell());
            Collections.shuffle(candidateCells);
            for (Cell cell : candidateCells) {
                List<Replica> replicasToMove = this.context.clusterModel().partition(topicPartition).replicasNotInCell(cell);
                boolean replicasOnExcludedBroker = replicasToMove.stream().anyMatch(r -> !r.broker().isEligible());
                if (replicasOnExcludedBroker) break;
                List availableBrokers = cell.aliveBrokers().stream().filter(broker -> broker.availableResource(CapacityGoal.this.resource(), CapacityGoal.this.balancingConstraint) > maxReplicaResource && !broker.hasReplicaOfPartition(topicPartition) && this.context.clusterModel.partition(topicPartition).canAssignReplicaToBroker((Broker)broker) && broker.isEligibleDestination()).collect(Collectors.toList());
                if (availableBrokers.size() < replicasToMove.size()) continue;
                availableBrokers.sort(new Broker.ResourceComparator(CapacityGoal.this.resource(), CapacityGoal.this.balancingConstraint));
                Iterable<List<Broker>> candidateBrokersIterable = EntityCombinator.singleEntityListIterable(availableBrokers, replicasToMove.size());
                Map<Replica, Broker> replicaMoves = GoalUtils.getPartitionMoves(this.context.clusterModel, this.context.optimizedGoals, replicasToMove, candidateBrokersIterable);
                if (replicaMoves.isEmpty()) continue;
                replicaMoves.forEach((replicaToMove, broker) -> CapacityGoal.this.relocateReplica(this.context.clusterModel, replicaToMove.topicPartition(), replicaToMove.broker().id(), broker.id()));
                return true;
            }
            return false;
        }

        private void rebalanceByPartitionMove(Broker sourceBroker) {
            List<Replica> brokerReplicas = this.replicasInDescendingSize(sourceBroker.replicas());
            for (Replica replica : brokerReplicas) {
                boolean success;
                if (!AbstractGoal.shouldExclude(replica, this.context.excludedTopics()) && (success = this.movePartition(replica)) && this.isBrokerBalanced()) break;
            }
        }
    }

    private class ReplicaMovementRebalance
    extends AbstractDesiredCapacityGoalRebalanceStep {
        ReplicaMovementRebalance(RebalanceContext context) {
            super(context);
        }

        @Override
        public boolean doBalance() {
            List<Broker> sortedEligibleDestinationBrokersUnderCapacityLimit = CapacityGoal.this.eligibleBrokersReplicaMove(this.context.clusterModel, CapacityGoal.this.resource());
            for (Replica replica : this.context.broker().trackedSortedReplicas(CapacityGoal.this.sortName()).reverselySortedReplicas()) {
                if (AbstractGoal.shouldExclude(replica, this.context.excludedTopics()) || CapacityGoal.this.shouldExcludeForReplicaMove(replica)) continue;
                int numPartitionCells = GoalUtils.numPartitionCells(this.context.clusterModel.partition(replica.topicPartition()));
                List<Broker> sortedAliveBrokersUnderCapacityLimitInCell = sortedEligibleDestinationBrokersUnderCapacityLimit.stream().filter(broker -> numPartitionCells > 1 || broker.cell().equals(replica.broker().cell())).collect(Collectors.toList());
                Broker b = CapacityGoal.this.maybeApplyBalancingAction(this.context.clusterModel(), replica, sortedAliveBrokersUnderCapacityLimitInCell, ActionType.INTER_BROKER_REPLICA_MOVEMENT, this.context.optimizedGoals(), this.context.optimizationOptions(), Optional.empty());
                if (b == null) {
                    LOG.debug("Failed to move replica {} to any broker in {}", (Object)replica, sortedAliveBrokersUnderCapacityLimitInCell);
                }
                if (this.isBrokerBalanced()) break;
                sortedEligibleDestinationBrokersUnderCapacityLimit = CapacityGoal.this.eligibleBrokersReplicaMove(this.context.clusterModel, CapacityGoal.this.resource());
            }
            return this.isBrokerBalanced();
        }
    }

    private class LeadershipMovementRebalance
    extends AbstractDesiredCapacityGoalRebalanceStep {
        LeadershipMovementRebalance(RebalanceContext context) {
            super(context);
        }

        @Override
        public boolean doBalance() {
            if (CapacityGoal.this.shouldTryLeadershipMovement(this.context.resource())) {
                List<Replica> sortedLeadersInSourceBroker = this.context.broker().trackedSortedReplicas(CapacityGoal.this.sortNameByLeader()).reverselySortedReplicas();
                for (Replica leader : sortedLeadersInSourceBroker) {
                    if (AbstractGoal.shouldExclude(leader, this.context.excludedTopics())) continue;
                    List<Replica> onlineFollowers = this.context.clusterModel().partition(leader.topicPartition()).onlineFollowers();
                    GoalUtils.sortReplicasInAscendingOrderByBrokerResourceUtilization(onlineFollowers, this.context.resource());
                    List<Broker> eligibleBrokers = onlineFollowers.stream().map(Replica::broker).filter(Broker::isEligibleDestination).collect(Collectors.toList());
                    Broker b = CapacityGoal.this.maybeApplyBalancingAction(this.context.clusterModel(), leader, eligibleBrokers, ActionType.LEADERSHIP_MOVEMENT, this.context.optimizedGoals(), this.context.optimizationOptions(), Optional.empty());
                    if (b == null) {
                        LOG.debug("Failed to move leader replica {} to any other brokers in {}", (Object)leader, eligibleBrokers);
                    }
                    if (!this.isBrokerUtilizationUnderLimit()) continue;
                    break;
                }
            }
            return this.isBrokerBalanced();
        }
    }

    private class CheckShouldRebalance
    extends AbstractCapacityGoalRebalanceStep {
        CheckShouldRebalance(RebalanceContext context) {
            super(context);
        }

        @Override
        double hostCapacityLimit() {
            return this.context.hostCapacityLimit();
        }

        @Override
        double brokerCapacityLimit() {
            return this.context.brokerCapacityLimit();
        }

        @Override
        public boolean doBalance() {
            return this.isBrokerBalanced();
        }
    }

    private abstract class AbstractDesiredCapacityGoalRebalanceStep
    extends AbstractCapacityGoalRebalanceStep {
        AbstractDesiredCapacityGoalRebalanceStep(RebalanceContext context) {
            super(context);
        }

        @Override
        double hostCapacityLimit() {
            return this.context.hostDesiredCapacityLimit();
        }

        @Override
        double brokerCapacityLimit() {
            return this.context.brokerDesiredCapacityLimit();
        }
    }

    private abstract class AbstractCapacityGoalRebalanceStep
    implements RebalanceStep {
        protected final RebalanceContext context;

        AbstractCapacityGoalRebalanceStep(RebalanceContext context) {
            this.context = context;
        }

        private boolean isUtilizationOverLimit() {
            return CapacityGoal.this.isUtilizationOverLimit(this.context.broker(), this.brokerCapacityLimit(), this.hostCapacityLimit());
        }

        abstract double hostCapacityLimit();

        abstract double brokerCapacityLimit();

        protected boolean isBrokerUtilizationUnderLimit() {
            return !this.isUtilizationOverLimit();
        }

        protected boolean isBrokerBalanced() {
            return this.isBrokerUtilizationUnderLimit() && this.context.broker().currentOfflineReplicas().isEmpty();
        }
    }
}

