package io.confluent.kafka.databalancing;

import io.confluent.kafka.databalancing.RebalancePolicy;
import io.confluent.kafka.databalancing.constraint.Constraints;
import io.confluent.kafka.databalancing.constraint.RebalanceConstraints;
import io.confluent.kafka.databalancing.exception.ValidationException;
import io.confluent.kafka.databalancing.metric.Metrics;
import io.confluent.kafka.databalancing.topology.Broker;
import io.confluent.kafka.databalancing.topology.Replica;
import io.confluent.kafka.databalancing.topology.ReplicaAssignment;
import io.confluent.kafka.databalancing.topology.TopologyUtils;
import io.confluent.kafka.databalancing.view.BrokerCountFairView;
import io.confluent.kafka.databalancing.view.BrokerSizeFairView;
import io.confluent.kafka.databalancing.view.ClusterView;
import io.confluent.kafka.databalancing.view.RackCountFairView;
import io.confluent.kafka.databalancing.view.RackSizeFairView;
import io.confluent.kafka.databalancing.view.TopicViewSupplier;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import kafka.admin.BrokerMetadata;
import org.apache.kafka.common.TopicPartition;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/confluent/kafka/databalancing/MovesOptimisedRebalancePolicy.class */
public class MovesOptimisedRebalancePolicy implements RebalancePolicy {
    private static final Logger logger = LoggerFactory.getLogger(MovesOptimisedRebalancePolicy.class);
    private int replicasMoved = 0;
    private int leadersMoved = 0;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:io/confluent/kafka/databalancing/MovesOptimisedRebalancePolicy$BiPredicate.class */
    public interface BiPredicate<T> {
        boolean test(T t, T t2);
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:io/confluent/kafka/databalancing/MovesOptimisedRebalancePolicy$ConstraintType.class */
    public enum ConstraintType {
        RACK,
        DISK_SPACE,
        PARTITION
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:io/confluent/kafka/databalancing/MovesOptimisedRebalancePolicy$MoveAttempt.class */
    public static class MoveAttempt {
        final Replica replica;
        final Broker destination;
        final List<ConstraintType> failedConstraints;

        private MoveAttempt(Replica replica, Broker broker, List<ConstraintType> list) {
            this.replica = replica;
            this.destination = broker;
            this.failedConstraints = list;
        }

        boolean allowMove() {
            return this.failedConstraints.isEmpty();
        }

        boolean allowMoveWithRelaxedRackConstraint() {
            return this.failedConstraints.size() == 1 && this.failedConstraints.get(0) == ConstraintType.RACK;
        }

        Broker getDestination() {
            return this.destination;
        }

        String errorMessage() {
            StringBuilder sb = new StringBuilder();
            Iterator<ConstraintType> it = this.failedConstraints.iterator();
            while (it.hasNext()) {
                sb.append(it.next());
                if (it.hasNext()) {
                    sb.append(", ");
                }
            }
            return String.format("[%s -> %s] failed constraints: [%s]", this.replica, Integer.valueOf(this.destination.id()), sb);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:io/confluent/kafka/databalancing/MovesOptimisedRebalancePolicy$MoveResult.class */
    public static class MoveResult {
        public MoveAttempt successfulMove;
        public List<MoveAttempt> failedMoves;

        public boolean success() {
            return this.successfulMove != null;
        }

        public MoveAttempt getSuccessfulMove() {
            return this.successfulMove;
        }

        public List<MoveAttempt> getFailedMoves() {
            return this.failedMoves;
        }

        private MoveResult(List<MoveAttempt> list) {
            this.failedMoves = list;
        }

        private MoveResult(MoveAttempt moveAttempt, List<MoveAttempt> list) {
            this.successfulMove = moveAttempt;
            this.failedMoves = list;
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:io/confluent/kafka/databalancing/MovesOptimisedRebalancePolicy$Predicate.class */
    public interface Predicate<T> {
        boolean test(T t);
    }

    @Override // io.confluent.kafka.databalancing.RebalancePolicy
    public ReplicaAssignment rebalancePartitions(List<BrokerMetadata> list, ReplicaAssignment replicaAssignment, Map<String, Integer> map, Metrics metrics, RebalancePolicy.Config config, List<Broker> list2) {
        MutableRebalanceContext create = DefaultRebalanceContext.create(list, replicaAssignment.asMap(), map, metrics, config, list2);
        Constraints constraints = new Constraints(create, null);
        List<String> sorted = Utils.sorted(create.allRacks());
        List<String> list3 = sorted;
        if (sorted.isEmpty()) {
            list3 = Arrays.asList(null);
            logger.info("Executing cluster-wide as no racks configured");
        }
        int i = 1 + 1;
        logger.info("**** {}. Ensure no under-replicated partitions ****", 1);
        fullyReplicate(create, constraints);
        int i2 = i + 1;
        logger.info("**** {}. Move replicas from brokers to be removed ****", Integer.valueOf(i));
        moveReplicasFromRemovedBrokers(create, constraints);
        List<String> sorted2 = Utils.sorted(create.topics());
        if (!sorted.isEmpty()) {
            for (String str : sorted2) {
                int i3 = i2;
                int i4 = i2 + 1;
                logger.info("**** {}. Ensure balanced replica count across racks for topic: {} ****", Integer.valueOf(i3), str);
                replicaTopicFairness(create, new RackCountFairView(create, str));
                i2 = i4 + 1;
                logger.info("**** {}. Ensure balanced replica size across racks for topic: {} ****", Integer.valueOf(i4), str);
                replicaTopicFairness(create, new RackSizeFairView(create, str));
            }
        }
        for (String str2 : list3) {
            String rackLogMessageFragment = rackLogMessageFragment(str2);
            for (String str3 : Utils.sorted(create.topics())) {
                int i5 = i2;
                int i6 = i2 + 1;
                logger.info("**** {}. Ensure balanced replica count for topic: {} {} ****", new Object[]{Integer.valueOf(i5), str3, rackLogMessageFragment});
                replicaTopicFairness(create, new BrokerCountFairView(create, str2, str3));
                i2 = i6 + 1;
                logger.info("**** {}. Ensure balanced replica size for topic: {} {} ****", new Object[]{Integer.valueOf(i6), str3, rackLogMessageFragment});
                replicaTopicFairness(create, new BrokerSizeFairView(create, str2, str3));
            }
        }
        for (String str4 : sorted2) {
            int i7 = i2;
            i2++;
            logger.info("**** {}. Ensure balanced leaders for topic: {} {} ****", new Object[]{Integer.valueOf(i7), str4, maybeClusterWide(sorted)});
            leaderTopicFairness(create, new BrokerCountFairView(create, null, str4));
        }
        if (!sorted.isEmpty()) {
            int i8 = i2;
            int i9 = i2 + 1;
            logger.info("**** {}. Ensure balanced replica count across racks ****", Integer.valueOf(i8));
            replicaFairness(create, new RackCountFairView(create, null), TopicViewSupplier.rackCount(create));
            i2 = i9 + 1;
            logger.info("**** {}. Ensure balanced replica size across racks ****", Integer.valueOf(i9));
            replicaSizeFairness(create, new RackSizeFairView(create, null), TopicViewSupplier.rackSize(create));
        }
        for (String str5 : list3) {
            String rackLogMessageFragment2 = rackLogMessageFragment(str5);
            int i10 = i2;
            int i11 = i2 + 1;
            logger.info("**** {}. Ensure balanced replica count {} ****", Integer.valueOf(i10), rackLogMessageFragment2);
            replicaFairness(create, new BrokerCountFairView(create, str5, null), TopicViewSupplier.brokerCount(create, str5));
            i2 = i11 + 1;
            logger.info("**** {}. Ensure balanced replica size {} ****", Integer.valueOf(i11), rackLogMessageFragment2);
            replicaSizeFairness(create, new BrokerSizeFairView(create, str5, null), TopicViewSupplier.brokerSize(create, str5));
        }
        int i12 = i2;
        int i13 = i2 + 1;
        logger.info("**** {}. Ensure balanced leaders for brokers {} ****", Integer.valueOf(i12), maybeClusterWide(sorted));
        leaderFairness(create, new BrokerCountFairView(create, null, null), TopicViewSupplier.brokerCount(create, null));
        log(create);
        return new ReplicaAssignment(partitionToReplicaIds(create));
    }

    private String maybeClusterWide(List<String> list) {
        return list.isEmpty() ? "" : "(cluster-wide)";
    }

    private String rackLogMessageFragment(String str) {
        return str == null ? "" : "across brokers on rack " + str;
    }

    private void replicaTopicFairness(MutableRebalanceContext mutableRebalanceContext, ClusterView clusterView) {
        boolean z = true;
        while (z) {
            z = false;
            for (final Replica replica : clusterView.replicasOnAboveParBrokers()) {
                if (clusterView.replicasOnAboveParBrokers().contains(replica)) {
                    final ClusterView clusterView2 = clusterView;
                    if (moveReplicaToBelowParBroker(mutableRebalanceContext, clusterView.constraints(), replica, clusterView.brokersWithBelowParReplicaFairness(), new Predicate<Broker>() { // from class: io.confluent.kafka.databalancing.MovesOptimisedRebalancePolicy.1
                        @Override // io.confluent.kafka.databalancing.MovesOptimisedRebalancePolicy.Predicate
                        public boolean test(Broker broker) {
                            return clusterView2.compareReplicaFairness(replica.topicPartition(), replica.broker(), broker).isGreater();
                        }
                    })) {
                        z = true;
                        clusterView = clusterView.refresh(mutableRebalanceContext);
                    }
                }
            }
        }
    }

    void replicaSizeFairness(MutableRebalanceContext mutableRebalanceContext, ClusterView clusterView, final TopicViewSupplier<?> topicViewSupplier) {
        ClusterView replicaFairness = replicaFairness(mutableRebalanceContext, clusterView, topicViewSupplier);
        boolean z = true;
        while (z) {
            z = false;
            for (final Replica replica : replicaFairness.replicasOnAboveParBrokers()) {
                if (replicaFairness.replicasOnAboveParBrokers().contains(replica)) {
                    final ClusterView clusterView2 = replicaFairness;
                    Predicate<Replica> predicate = new Predicate<Replica>() { // from class: io.confluent.kafka.databalancing.MovesOptimisedRebalancePolicy.2
                        private final ClusterView aboveParTopicView;

                        {
                            this.aboveParTopicView = topicViewSupplier.get(replica.topicPartition().topic());
                        }

                        @Override // io.confluent.kafka.databalancing.MovesOptimisedRebalancePolicy.Predicate
                        public boolean test(Replica replica2) {
                            Broker broker = replica2.broker();
                            return clusterView2.compareReplicaFairness(replica.broker(), replica.topicPartition(), broker, replica2.topicPartition()).isGreater() && this.aboveParTopicView.compareReplicaFairness(replica.broker(), replica.topicPartition(), broker, replica2.topicPartition()).isGreaterOrEqual() && topicViewSupplier.get(replica2.topicPartition().topic()).compareReplicaFairness(replica.broker(), replica.topicPartition(), broker, replica2.topicPartition()).isGreaterOrEqual();
                        }
                    };
                    ArrayList arrayList = new ArrayList();
                    for (Broker broker : replicaFairness.brokersWithBelowParReplicaFairness()) {
                        Iterator<TopicPartition> it = TopologyUtils.sortPartitionsBySize(mutableRebalanceContext, mutableRebalanceContext.replicas(broker)).iterator();
                        while (it.hasNext()) {
                            arrayList.add(new Replica(it.next(), broker));
                        }
                    }
                    if (switchReplicaWithBelowParBrokerReplica(mutableRebalanceContext, replicaFairness.constraints(), replica, arrayList, predicate)) {
                        z = true;
                        replicaFairness = replicaFairness.refresh(mutableRebalanceContext);
                        topicViewSupplier.clear();
                    }
                }
            }
        }
    }

    private boolean switchReplicaWithBelowParBrokerReplica(MutableRebalanceContext mutableRebalanceContext, RebalanceConstraints rebalanceConstraints, Replica replica, Collection<Replica> collection, Predicate<Replica> predicate) {
        logger.debug("Moving {}, # belowPar replica candidates: {} ", replica, Integer.valueOf(collection.size()));
        for (Replica replica2 : collection) {
            Broker broker = replica2.broker();
            TopicPartition topicPartition = replica2.topicPartition();
            boolean obeysDiskSpaceConstraint = rebalanceConstraints.obeysDiskSpaceConstraint(replica.topicPartition(), replica.broker(), replica2.topicPartition());
            boolean obeysDiskSpaceConstraint2 = rebalanceConstraints.obeysDiskSpaceConstraint(topicPartition, broker, replica.topicPartition());
            boolean obeysPartitionConstraint = rebalanceConstraints.obeysPartitionConstraint(replica.topicPartition(), broker);
            boolean obeysPartitionConstraint2 = rebalanceConstraints.obeysPartitionConstraint(topicPartition, replica.broker());
            boolean obeysRackConstraint = rebalanceConstraints.obeysRackConstraint(replica.topicPartition(), replica.broker(), broker);
            boolean obeysRackConstraint2 = rebalanceConstraints.obeysRackConstraint(topicPartition, broker, replica.broker());
            boolean test = predicate.test(replica2);
            if (obeysDiskSpaceConstraint && obeysDiskSpaceConstraint2 && obeysPartitionConstraint && obeysPartitionConstraint2 && test && obeysRackConstraint && obeysRackConstraint2) {
                movePartition(mutableRebalanceContext, replica.topicPartition(), replica.broker(), broker);
                movePartition(mutableRebalanceContext, replica2.topicPartition(), broker, replica.broker());
                return true;
            }
        }
        logger.debug("Replica {} could not be moved despite attempting {} different brokers", replica, Integer.valueOf(collection.size()));
        return false;
    }

    private ValidationException buildMoveException(Broker broker, TopicPartition topicPartition, MoveResult moveResult) {
        StringBuilder sb = new StringBuilder();
        Iterator<MoveAttempt> it = moveResult.getFailedMoves().iterator();
        while (it.hasNext()) {
            sb.append(it.next().errorMessage());
            if (it.hasNext()) {
                sb.append(", ");
            }
        }
        return new ValidationException(String.format("ERROR: Could not move partition %s from removed broker %s, due to the following constraint errors: %s.", topicPartition, Integer.valueOf(broker.id()), sb));
    }

    private void moveReplicasFromRemovedBrokers(MutableRebalanceContext mutableRebalanceContext, Constraints constraints) {
        for (Broker broker : mutableRebalanceContext.brokersToBeRemoved()) {
            for (TopicPartition topicPartition : mutableRebalanceContext.replicas(broker)) {
                MoveResult moveReplicaFromRemovedBroker = moveReplicaFromRemovedBroker(mutableRebalanceContext, constraints, new Replica(topicPartition, broker));
                if (!moveReplicaFromRemovedBroker.success()) {
                    throw buildMoveException(broker, topicPartition, moveReplicaFromRemovedBroker);
                }
            }
        }
    }

    private MoveResult moveReplicaFromRemovedBroker(MutableRebalanceContext mutableRebalanceContext, Constraints constraints, Replica replica) {
        String brokerRack = mutableRebalanceContext.brokerRack(replica.broker());
        List<Broker> leastLoadedBrokersPreferringTheseRacks = TopologyUtils.leastLoadedBrokersPreferringTheseRacks(mutableRebalanceContext, brokerRack == null ? Collections.emptySet() : Collections.singleton(brokerRack));
        HashMap hashMap = new HashMap();
        for (Broker broker : leastLoadedBrokersPreferringTheseRacks) {
            MoveAttempt failedConstraints = failedConstraints(constraints, replica, broker);
            if (failedConstraints.allowMove()) {
                movePartition(mutableRebalanceContext, replica.topicPartition(), replica.broker(), broker);
                return new MoveResult(failedConstraints, new ArrayList());
            }
            hashMap.put(Integer.valueOf(broker.id()), failedConstraints);
        }
        if (!constraints.obeysRackConstraint(replica.topicPartition(), replica.broker(), replica.broker())) {
            HashSet hashSet = new HashSet(mutableRebalanceContext.racks(mutableRebalanceContext.brokers(replica.topicPartition())));
            hashSet.remove(brokerRack);
            for (Broker broker2 : TopologyUtils.leastLoadedBrokersPreferRacksAvoidRacks(mutableRebalanceContext, brokerRack == null ? Collections.emptySet() : Collections.singleton(brokerRack), hashSet)) {
                MoveAttempt moveAttempt = (MoveAttempt) hashMap.get(Integer.valueOf(broker2.id()));
                if (moveAttempt.allowMoveWithRelaxedRackConstraint()) {
                    HashMap hashMap2 = new HashMap();
                    for (Broker broker3 : mutableRebalanceContext.brokers(replica.topicPartition())) {
                        hashMap2.put(Integer.valueOf(broker3.id()), mutableRebalanceContext.brokerRack(broker3));
                    }
                    logger.warn("Replica {} rack={} move to {} rack={} allowed despite failing rack constraint as current partition assignment already fails the rack constraint. Partition broker and rack assignment prior to move: {}.", new Object[]{replica, mutableRebalanceContext.brokerRack(replica.broker()), broker2, mutableRebalanceContext.brokerRack(broker2), hashMap2});
                    movePartition(mutableRebalanceContext, replica.topicPartition(), replica.broker(), broker2);
                    return new MoveResult(moveAttempt, new ArrayList());
                }
            }
        }
        return new MoveResult(new ArrayList(hashMap.values()));
    }

    private MoveAttempt failedConstraints(Constraints constraints, Replica replica, Broker broker) {
        ArrayList arrayList = new ArrayList();
        if (!constraints.obeysPartitionConstraint(replica.topicPartition(), broker)) {
            arrayList.add(ConstraintType.PARTITION);
        }
        if (!constraints.obeysRackConstraint(replica.topicPartition(), replica.broker(), broker)) {
            arrayList.add(ConstraintType.RACK);
        }
        if (!constraints.obeysDiskSpaceConstraint(replica.topicPartition(), broker)) {
            arrayList.add(ConstraintType.DISK_SPACE);
        }
        return new MoveAttempt(replica, broker, arrayList);
    }

    void fullyReplicate(MutableRebalanceContext mutableRebalanceContext, Constraints constraints) {
        for (TopicPartition topicPartition : mutableRebalanceContext.allPartitions()) {
            int replicationFactor = mutableRebalanceContext.replicationFactor(topicPartition.topic());
            Set<String> racksFor = TopologyUtils.racksFor(mutableRebalanceContext, topicPartition);
            while (true) {
                if (replicationFactor > mutableRebalanceContext.brokers(topicPartition).size()) {
                    if (!addReplica(mutableRebalanceContext, constraints, racksFor, topicPartition)) {
                        logger.warn("Could not create replica due to either rack, partition or disk space constraints. Thus partition {} will remain under-replicated.", topicPartition);
                        break;
                    }
                } else {
                    break;
                }
            }
        }
    }

    private boolean addReplica(MutableRebalanceContext mutableRebalanceContext, Constraints constraints, Set<String> set, TopicPartition topicPartition) {
        for (Broker broker : TopologyUtils.leastLoadedBrokersPreferringOtherRacks(mutableRebalanceContext, set)) {
            if (constraints.obeysPartitionConstraint(topicPartition, broker) && constraints.obeysRackConstraint(topicPartition, null, broker) && constraints.obeysDiskSpaceConstraint(topicPartition, broker)) {
                mutableRebalanceContext.addReplica(topicPartition, broker);
                return true;
            }
        }
        return false;
    }

    private ClusterView replicaFairness(MutableRebalanceContext mutableRebalanceContext, ClusterView clusterView, final TopicViewSupplier<?> topicViewSupplier) {
        boolean z = true;
        while (z) {
            z = false;
            for (final Replica replica : clusterView.replicasOnAboveParBrokers()) {
                if (clusterView.replicasOnAboveParBrokers().contains(replica)) {
                    final ClusterView clusterView2 = clusterView;
                    if (moveReplicaToBelowParBroker(mutableRebalanceContext, clusterView.constraints(), replica, clusterView.brokersWithBelowParReplicaFairness(), new Predicate<Broker>() { // from class: io.confluent.kafka.databalancing.MovesOptimisedRebalancePolicy.3
                        private final ClusterView topicView;

                        {
                            this.topicView = topicViewSupplier.get(replica.topicPartition().topic());
                        }

                        @Override // io.confluent.kafka.databalancing.MovesOptimisedRebalancePolicy.Predicate
                        public boolean test(Broker broker) {
                            return clusterView2.compareReplicaFairness(replica.topicPartition(), replica.broker(), broker).isGreater() && this.topicView.compareReplicaFairness(replica.topicPartition(), replica.broker(), broker).isGreaterOrEqual();
                        }
                    })) {
                        z = true;
                        clusterView = clusterView.refresh(mutableRebalanceContext);
                        topicViewSupplier.clear();
                    }
                }
            }
        }
        return clusterView;
    }

    private boolean moveReplicaToBelowParBroker(MutableRebalanceContext mutableRebalanceContext, RebalanceConstraints rebalanceConstraints, Replica replica, Collection<Broker> collection, Predicate<Broker> predicate) {
        logger.debug("Moving {}, # belowPar candidates: {} ", replica, Integer.valueOf(collection.size()));
        for (Broker broker : collection) {
            boolean obeysPartitionConstraint = rebalanceConstraints.obeysPartitionConstraint(replica.topicPartition(), broker);
            boolean obeysRackConstraint = rebalanceConstraints.obeysRackConstraint(replica.topicPartition(), replica.broker(), broker);
            boolean obeysDiskSpaceConstraint = rebalanceConstraints.obeysDiskSpaceConstraint(replica.topicPartition(), broker);
            boolean test = predicate.test(broker);
            if (obeysRackConstraint && obeysPartitionConstraint && obeysDiskSpaceConstraint && test) {
                movePartition(mutableRebalanceContext, replica.topicPartition(), replica.broker(), broker);
                return true;
            }
            logger.debug("Move to {} failed due to rack/partition/disk space/fairness constraints: {}, {}, {}, {}", new Object[]{broker, Boolean.valueOf(obeysRackConstraint), Boolean.valueOf(obeysPartitionConstraint), Boolean.valueOf(obeysDiskSpaceConstraint), Boolean.valueOf(test)});
        }
        logger.debug("Replica {} could not be moved despite attempting {} different brokers", replica, Integer.valueOf(collection.size()));
        return false;
    }

    private void movePartition(MutableRebalanceContext mutableRebalanceContext, TopicPartition topicPartition, Broker broker, Broker broker2) {
        if (broker2.equals(broker)) {
            logger.debug("Movement was not made as both source and destination broker are the same: {}", Integer.valueOf(broker.id()));
            return;
        }
        mutableRebalanceContext.movePartition(topicPartition, broker, broker2);
        this.replicasMoved++;
        if (logger.isDebugEnabled()) {
            logger.debug("Moved partition {} ({} MB) from broker {} ({} MB) to broker {} ({} MB)", new Object[]{topicPartition, TopologyUtils.formattedPartitionSize(mutableRebalanceContext, topicPartition), Integer.valueOf(broker.id()), TopologyUtils.formattedBrokerSize(mutableRebalanceContext, broker), Integer.valueOf(broker2.id()), TopologyUtils.formattedBrokerSize(mutableRebalanceContext, broker2)});
        }
    }

    private void leaderTopicFairness(final MutableRebalanceContext mutableRebalanceContext, ClusterView clusterView) {
        boolean z = true;
        while (z) {
            z = false;
            for (final TopicPartition topicPartition : clusterView.leadersOnAboveParBrokers()) {
                if (clusterView.leadersOnAboveParBrokers().contains(topicPartition)) {
                    final ClusterView clusterView2 = clusterView;
                    if (changeLeadership(mutableRebalanceContext, topicPartition, new BiPredicate<Broker>() { // from class: io.confluent.kafka.databalancing.MovesOptimisedRebalancePolicy.4
                        @Override // io.confluent.kafka.databalancing.MovesOptimisedRebalancePolicy.BiPredicate
                        public boolean test(Broker broker, Broker broker2) {
                            return clusterView2.compareLeaderFairness(topicPartition, broker, broker2).isGreater();
                        }
                    })) {
                        z = true;
                        clusterView = clusterView.refresh(mutableRebalanceContext);
                    }
                }
            }
        }
        boolean z2 = true;
        while (z2) {
            z2 = false;
            for (final TopicPartition topicPartition2 : clusterView.leadersOnAboveParBrokers()) {
                if (clusterView.leadersOnAboveParBrokers().contains(topicPartition2)) {
                    final ClusterView clusterView3 = clusterView;
                    if (switchLeaderToOtherBroker(mutableRebalanceContext, topicPartition2, clusterView.brokersWithBelowParLeaderFairness(), new Predicate<Replica>() { // from class: io.confluent.kafka.databalancing.MovesOptimisedRebalancePolicy.5
                        private final Broker aboveParLeaderBroker;

                        {
                            this.aboveParLeaderBroker = mutableRebalanceContext.brokers(topicPartition2).get(0);
                        }

                        @Override // io.confluent.kafka.databalancing.MovesOptimisedRebalancePolicy.Predicate
                        public boolean test(Replica replica) {
                            Broker broker = replica.broker();
                            TopicPartition topicPartition3 = replica.topicPartition();
                            return clusterView3.constraints().obeysDiskSpaceConstraint(topicPartition2, this.aboveParLeaderBroker, topicPartition3) && clusterView3.constraints().obeysDiskSpaceConstraint(topicPartition3, broker, topicPartition2) && clusterView3.constraints().obeysPartitionConstraint(topicPartition2, broker) && clusterView3.constraints().obeysPartitionConstraint(topicPartition3, this.aboveParLeaderBroker) && clusterView3.compareLeaderFairness(topicPartition2, this.aboveParLeaderBroker, broker).isGreater();
                        }
                    })) {
                        z2 = true;
                        clusterView = clusterView.refresh(mutableRebalanceContext);
                    }
                }
            }
        }
    }

    /* JADX WARN: Type inference failed for: r0v19, types: [io.confluent.kafka.databalancing.view.ClusterView] */
    /* JADX WARN: Type inference failed for: r0v42, types: [io.confluent.kafka.databalancing.view.ClusterView] */
    private void leaderFairness(final MutableRebalanceContext mutableRebalanceContext, ClusterView clusterView, TopicViewSupplier<?> topicViewSupplier) {
        boolean z = true;
        while (z) {
            z = false;
            for (final TopicPartition topicPartition : clusterView.leadersOnAboveParBrokers()) {
                if (clusterView.leadersOnAboveParBrokers().contains(topicPartition)) {
                    final ClusterView clusterView2 = clusterView;
                    final ?? r0 = topicViewSupplier.get(topicPartition.topic());
                    if (changeLeadership(mutableRebalanceContext, topicPartition, new BiPredicate<Broker>() { // from class: io.confluent.kafka.databalancing.MovesOptimisedRebalancePolicy.6
                        @Override // io.confluent.kafka.databalancing.MovesOptimisedRebalancePolicy.BiPredicate
                        public boolean test(Broker broker, Broker broker2) {
                            return clusterView2.compareLeaderFairness(topicPartition, broker, broker2).isGreater() && r0.compareLeaderFairness(topicPartition, broker, broker2).isGreaterOrEqual();
                        }
                    })) {
                        z = true;
                        clusterView = clusterView.refresh(mutableRebalanceContext);
                        topicViewSupplier.clear();
                    }
                }
            }
        }
        boolean z2 = true;
        while (z2) {
            z2 = false;
            for (final TopicPartition topicPartition2 : clusterView.leadersOnAboveParBrokers()) {
                if (clusterView.leadersOnAboveParBrokers().contains(topicPartition2)) {
                    List<Broker> brokersWithBelowParLeaderFairness = clusterView.brokersWithBelowParLeaderFairness();
                    final ?? r02 = topicViewSupplier.get(topicPartition2.topic());
                    final ClusterView clusterView3 = clusterView;
                    if (switchLeaderToOtherBroker(mutableRebalanceContext, topicPartition2, brokersWithBelowParLeaderFairness, new Predicate<Replica>() { // from class: io.confluent.kafka.databalancing.MovesOptimisedRebalancePolicy.7
                        private final Broker aboveParLeaderBroker;

                        {
                            this.aboveParLeaderBroker = mutableRebalanceContext.brokers(topicPartition2).get(0);
                        }

                        @Override // io.confluent.kafka.databalancing.MovesOptimisedRebalancePolicy.Predicate
                        public boolean test(Replica replica) {
                            Broker broker = replica.broker();
                            TopicPartition topicPartition3 = replica.topicPartition();
                            return clusterView3.constraints().obeysDiskSpaceConstraint(topicPartition2, this.aboveParLeaderBroker, topicPartition3) && clusterView3.constraints().obeysDiskSpaceConstraint(topicPartition3, broker, topicPartition2) && clusterView3.constraints().obeysPartitionConstraint(topicPartition2, broker) && clusterView3.constraints().obeysPartitionConstraint(topicPartition3, this.aboveParLeaderBroker) && clusterView3.compareLeaderFairness(topicPartition2, this.aboveParLeaderBroker, broker).isGreater() && r02.compareLeaderFairness(topicPartition2, this.aboveParLeaderBroker, broker).isGreaterOrEqual();
                        }
                    })) {
                        z2 = true;
                        clusterView = clusterView.refresh(mutableRebalanceContext);
                        topicViewSupplier.clear();
                    }
                }
            }
        }
    }

    private boolean changeLeadership(MutableRebalanceContext mutableRebalanceContext, TopicPartition topicPartition, BiPredicate<Broker> biPredicate) {
        List<Broker> brokers = mutableRebalanceContext.brokers(topicPartition);
        Broker broker = brokers.get(0);
        for (Broker broker2 : brokers.subList(1, brokers.size())) {
            if (biPredicate.test(broker, broker2)) {
                makeLeader(mutableRebalanceContext, topicPartition, broker2);
                return true;
            }
        }
        return false;
    }

    private boolean switchLeaderToOtherBroker(final MutableRebalanceContext mutableRebalanceContext, TopicPartition topicPartition, Collection<Broker> collection, Predicate<Replica> predicate) {
        Broker broker = mutableRebalanceContext.brokers(topicPartition).get(0);
        ArrayList<Replica> arrayList = new ArrayList();
        for (Broker broker2 : collection) {
            Iterator<TopicPartition> it = mutableRebalanceContext.followers(broker2).iterator();
            while (it.hasNext()) {
                arrayList.add(new Replica(it.next(), broker2));
            }
        }
        final long partitionSize = mutableRebalanceContext.partitionSize(topicPartition);
        Collections.sort(arrayList, new Comparator<Replica>() { // from class: io.confluent.kafka.databalancing.MovesOptimisedRebalancePolicy.8
            @Override // java.util.Comparator
            public int compare(Replica replica, Replica replica2) {
                long partitionSize2 = mutableRebalanceContext.partitionSize(replica.topicPartition());
                long partitionSize3 = mutableRebalanceContext.partitionSize(replica2.topicPartition());
                int compare = Long.compare(Math.abs(partitionSize - partitionSize2), Math.abs(partitionSize - partitionSize3));
                return compare != 0 ? compare : partitionSize2 != partitionSize3 ? Long.compare(partitionSize2, partitionSize3) : replica.broker().id() != replica2.broker().id() ? Integer.compare(replica.broker().id(), replica2.broker().id()) : TopologyUtils.topicPartitionComparator.compare(replica.topicPartition(), replica2.topicPartition());
            }
        });
        for (Replica replica : arrayList) {
            if (predicate.test(replica)) {
                Broker broker3 = replica.broker();
                movePartition(mutableRebalanceContext, topicPartition, broker, broker3);
                movePartition(mutableRebalanceContext, replica.topicPartition(), broker3, broker);
                return true;
            }
        }
        return false;
    }

    void makeLeader(MutableRebalanceContext mutableRebalanceContext, TopicPartition topicPartition, Broker broker) {
        Broker leader = mutableRebalanceContext.leader(topicPartition);
        if (broker.equals(leader)) {
            System.out.println("Leadership change was not made as " + broker + " was already the leader for partition " + topicPartition + " - see: " + mutableRebalanceContext.brokers(topicPartition));
            return;
        }
        mutableRebalanceContext.makeLeader(topicPartition, broker);
        logger.debug("Leadership moved brokers: [{} -> {}] for partition {}:{}", new Object[]{leader, broker, topicPartition, mutableRebalanceContext.brokers(topicPartition)});
        this.leadersMoved++;
    }

    private Map<TopicPartition, List<Integer>> partitionToReplicaIds(RebalanceContext rebalanceContext) {
        HashMap hashMap = new HashMap(rebalanceContext.partitionToBrokers().size());
        for (Map.Entry<TopicPartition, List<Broker>> entry : rebalanceContext.partitionToBrokers().entrySet()) {
            ArrayList arrayList = new ArrayList(entry.getValue().size());
            Iterator<Broker> it = entry.getValue().iterator();
            while (it.hasNext()) {
                arrayList.add(Integer.valueOf(it.next().id()));
            }
            hashMap.put(entry.getKey(), arrayList);
        }
        return hashMap;
    }

    public int replicasMoved() {
        return this.replicasMoved;
    }

    public int leadersMoved() {
        return this.leadersMoved;
    }

    private void log(RebalanceContext rebalanceContext) {
        logger.debug("Racks to replica Counts {}", TopologyUtils.rackReplicaCounts(rebalanceContext, null, null));
        logger.debug("Racks to leader Counts {}", TopologyUtils.rackLeaderCounts(rebalanceContext, null, null));
        logger.debug("Broker to replica Counts {}", TopologyUtils.brokerReplicaCounts(rebalanceContext, rebalanceContext.allBrokers(), null));
        logger.debug("Broker to leader Counts {}", TopologyUtils.brokerLeaderCounts(rebalanceContext, rebalanceContext.allBrokers(), null));
        logger.debug("Number of replicas moves {}", Integer.valueOf(this.replicasMoved));
        logger.debug("Number of leader moves {}", Integer.valueOf(this.leadersMoved));
    }
}
