package io.confluent.kafka.databalancing;

import io.confluent.common.utils.SystemTime;
import io.confluent.common.utils.Time;
import io.confluent.kafka.databalancing.metric.MetricsCollector;
import io.confluent.kafka.databalancing.throttle.Throttle;
import io.confluent.kafka.databalancing.topology.Broker;
import io.confluent.kafka.databalancing.topology.ClusterAssignment;
import io.confluent.kafka.databalancing.topology.ClusterReassignment;
import io.confluent.kafka.databalancing.topology.PartitionAssignment;
import io.confluent.kafka.databalancing.topology.PartitionReassignment;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.PriorityQueue;
import java.util.Set;
import java.util.stream.Collectors;
import org.apache.kafka.clients.admin.NewPartitionReassignment;
import org.apache.kafka.common.TopicPartition;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/confluent/kafka/databalancing/IncrementalRebalancer.class */
public class IncrementalRebalancer extends DefaultRebalancer {
    private static final Logger log = LoggerFactory.getLogger(IncrementalRebalancer.class);
    private final int maxConcurrentMovesPerLeader;
    private final Time time;

    /* loaded from: input_file:io/confluent/kafka/databalancing/IncrementalRebalancer$IncrementalRebalanceExecutor.class */
    private static class IncrementalRebalanceExecutor implements Runnable {
        private static final int MIN_BACKOFF_MS = 100;
        private static final int TIMEOUT_MS = 30000;
        private final RebalancerAdmin rebalancerAdmin;
        private final Throttle throttle;
        private final int maxConcurrentMovesByLeader;
        private final Time time;
        private Map<TopicPartition, PendingReassignment> retryReassignments;
        private long retryDeadlineMs;
        private final Map<Broker, LeaderReassignments> reassignments = new HashMap();
        private final int maxBackoffMs = 5000;
        private int backoffMs = MIN_BACKOFF_MS;

        IncrementalRebalanceExecutor(ProposedRebalance proposedRebalance, RebalancerAdmin rebalancerAdmin, Throttle throttle, Time time, int i) {
            this.rebalancerAdmin = rebalancerAdmin;
            this.throttle = throttle;
            this.time = time;
            this.maxConcurrentMovesByLeader = i;
            buildReassignments(proposedRebalance);
        }

        @Override // java.lang.Runnable
        public void run() {
            while (!this.reassignments.isEmpty()) {
                maybeSubmitReassignment();
                backoff();
                if (this.retryReassignments == null) {
                    checkCompletion();
                }
            }
        }

        private Map<TopicPartition, PendingReassignment> buildNextReassignment() {
            long milliseconds = this.time.milliseconds();
            if (this.retryReassignments != null) {
                if (milliseconds > this.retryDeadlineMs) {
                    throw new ReassignmentFailedException("Failed to submit reassignment " + this.retryReassignments + " after 30000ms");
                }
                printNewReassignment("Retry", this.retryReassignments);
                return this.retryReassignments;
            }
            HashMap hashMap = new HashMap();
            Iterator<LeaderReassignments> it = this.reassignments.values().iterator();
            while (it.hasNext()) {
                buildNewAssignment(it.next(), hashMap);
            }
            if (!hashMap.isEmpty()) {
                printNewReassignment("Begin", hashMap);
                this.retryDeadlineMs = milliseconds + 30000;
                this.retryReassignments = hashMap;
            }
            return hashMap;
        }

        private void printNewReassignment(String str, Map<TopicPartition, PendingReassignment> map) {
            for (Map.Entry<TopicPartition, PendingReassignment> entry : map.entrySet()) {
                PendingReassignment value = entry.getValue();
                System.out.println(str + " reassignment of " + entry.getKey() + " from " + value.sourceAssignment + " to " + value.targetAssignment);
            }
        }

        private void maybeSubmitReassignment() {
            Map<TopicPartition, PendingReassignment> buildNextReassignment = buildNextReassignment();
            if (buildNextReassignment.isEmpty()) {
                return;
            }
            try {
                this.throttle.throttleReplicas(reassignmentFromPendingMap(buildNextReassignment));
                submitReassignment(buildNextReassignment);
                this.retryReassignments = null;
                resetBackoff();
            } catch (RuntimeException e) {
                IncrementalRebalancer.log.warn("Failed to submit reassignment of partitions {}", buildNextReassignment.keySet(), e);
            }
        }

        private ClusterReassignment reassignmentFromPendingMap(Map<TopicPartition, PendingReassignment> map) {
            return new ClusterReassignment((Map) map.entrySet().stream().collect(Collectors.toMap((v0) -> {
                return v0.getKey();
            }, entry -> {
                PendingReassignment pendingReassignment = (PendingReassignment) entry.getValue();
                return new PartitionReassignment(pendingReassignment.sourceAssignment.replicaIds(), pendingReassignment.targetAssignment.replicaIds());
            })));
        }

        private void submitReassignment(Map<TopicPartition, PendingReassignment> map) {
            this.rebalancerAdmin.createPartitionReassignment((Map) map.entrySet().stream().collect(Collectors.toMap((v0) -> {
                return v0.getKey();
            }, entry -> {
                PartitionAssignment partitionAssignment = ((PendingReassignment) entry.getValue()).targetAssignment;
                return NewPartitionReassignment.ofReplicasAndObservers(partitionAssignment.replicaIds(), partitionAssignment.observerIds());
            })));
        }

        private void checkCompletion() {
            Set<TopicPartition> set = this.rebalancerAdmin.currentReassignment().topicPartitions();
            ArrayList arrayList = new ArrayList();
            HashMap hashMap = new HashMap();
            for (Map.Entry<Broker, LeaderReassignments> entry : this.reassignments.entrySet()) {
                Broker key = entry.getKey();
                LeaderReassignments value = entry.getValue();
                hashMap.putAll(value.checkAndComplete(set));
                if (value.numRemaining() == 0) {
                    arrayList.add(key);
                }
            }
            for (Map.Entry<TopicPartition, PendingReassignment> entry2 : hashMap.entrySet()) {
                PendingReassignment value2 = entry2.getValue();
                System.out.println("Completed reassignment of " + entry2.getKey() + " from " + value2.sourceAssignment + " to " + value2.targetAssignment);
            }
            this.throttle.unthrottleReplicas(reassignmentFromPendingMap(hashMap));
            this.reassignments.keySet().removeAll(arrayList);
        }

        private void buildNewAssignment(LeaderReassignments leaderReassignments, Map<TopicPartition, PendingReassignment> map) {
            PendingReassignment poll;
            while (leaderReassignments.numInProgress() < this.maxConcurrentMovesByLeader && (poll = leaderReassignments.poll()) != null) {
                map.put(poll.partition, poll);
            }
        }

        private void resetBackoff() {
            this.backoffMs = MIN_BACKOFF_MS;
        }

        private void backoff() {
            this.time.sleep(this.backoffMs);
            this.backoffMs = Math.min(this.backoffMs * 2, 5000);
        }

        private void buildReassignments(ProposedRebalance proposedRebalance) {
            ClusterAssignment currentAssignment = proposedRebalance.currentAssignment();
            for (Map.Entry<TopicPartition, PartitionAssignment> entry : proposedRebalance.proposedAssignment().asMap().entrySet()) {
                TopicPartition key = entry.getKey();
                PartitionAssignment assignment = currentAssignment.assignment(key);
                Broker broker = assignment.preferredLeader().get();
                PartitionAssignment value = entry.getValue();
                if (!value.equals(assignment)) {
                    this.reassignments.computeIfAbsent(broker, broker2 -> {
                        return new LeaderReassignments();
                    }).offer(new PendingReassignment(key, assignment, value, proposedRebalance.partitionSize(key)));
                }
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:io/confluent/kafka/databalancing/IncrementalRebalancer$LeaderReassignments.class */
    public static class LeaderReassignments {
        private Map<TopicPartition, PendingReassignment> inProgress;
        private PriorityQueue<PendingReassignment> awaiting;

        private LeaderReassignments() {
            this.inProgress = new HashMap();
            this.awaiting = new PriorityQueue<>();
        }

        PendingReassignment poll() {
            PendingReassignment poll = this.awaiting.poll();
            if (poll != null) {
                this.inProgress.put(poll.partition, poll);
            }
            return poll;
        }

        void offer(PendingReassignment pendingReassignment) {
            this.awaiting.offer(pendingReassignment);
        }

        Map<TopicPartition, PendingReassignment> checkAndComplete(Set<TopicPartition> set) {
            List<TopicPartition> list = (List) this.inProgress.keySet().stream().filter(topicPartition -> {
                return !set.contains(topicPartition);
            }).collect(Collectors.toList());
            if (list.isEmpty()) {
                return Collections.emptyMap();
            }
            HashMap hashMap = new HashMap(list.size());
            for (TopicPartition topicPartition2 : list) {
                hashMap.put(topicPartition2, this.inProgress.remove(topicPartition2));
            }
            return hashMap;
        }

        int numInProgress() {
            return this.inProgress.size();
        }

        int numRemaining() {
            return this.inProgress.size() + this.awaiting.size();
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:io/confluent/kafka/databalancing/IncrementalRebalancer$PendingReassignment.class */
    public static class PendingReassignment implements Comparable<PendingReassignment> {
        final TopicPartition partition;
        final PartitionAssignment sourceAssignment;
        final PartitionAssignment targetAssignment;
        final long sizeInBytes;

        private PendingReassignment(TopicPartition topicPartition, PartitionAssignment partitionAssignment, PartitionAssignment partitionAssignment2, long j) {
            this.partition = topicPartition;
            this.sourceAssignment = partitionAssignment;
            this.targetAssignment = partitionAssignment2;
            this.sizeInBytes = j;
        }

        @Override // java.lang.Comparable
        public int compareTo(PendingReassignment pendingReassignment) {
            int compare = Long.compare(this.sizeInBytes, pendingReassignment.sizeInBytes);
            if (compare != 0) {
                return compare;
            }
            int compareTo = this.partition.topic().compareTo(pendingReassignment.partition.topic());
            return compareTo != 0 ? compareTo : Integer.compare(this.partition.partition(), pendingReassignment.partition.partition());
        }
    }

    public IncrementalRebalancer(RebalancerConfig rebalancerConfig, RebalancerAdmin rebalancerAdmin) {
        super(rebalancerConfig, rebalancerAdmin);
        this.time = new SystemTime();
        this.maxConcurrentMovesPerLeader = rebalancerConfig.getInt(RebalancerConfig.MAX_CONCURRENT_LEADER_MOVES).intValue();
    }

    public IncrementalRebalancer(RebalancerConfig rebalancerConfig, RebalancerAdmin rebalancerAdmin, MetricsCollector metricsCollector, Throttle throttle, Time time) {
        super(rebalancerConfig, rebalancerAdmin, metricsCollector, throttle);
        this.time = time;
        this.maxConcurrentMovesPerLeader = rebalancerConfig.getInt(RebalancerConfig.MAX_CONCURRENT_LEADER_MOVES).intValue();
    }

    @Override // io.confluent.kafka.databalancing.DefaultRebalancer, io.confluent.kafka.databalancing.Rebalancer
    public void startRebalance(ProposedRebalance proposedRebalance, long j) {
        IncrementalRebalanceExecutor incrementalRebalanceExecutor = new IncrementalRebalanceExecutor(proposedRebalance, this.rebalancerAdmin, this.throttle, this.time, this.maxConcurrentMovesPerLeader);
        this.throttle.limit(j, proposedRebalance.reassignment().brokers());
        incrementalRebalanceExecutor.run();
        this.throttle.disengage();
    }
}
