package io.confluent.kafka.databalancing;

import io.confluent.kafka.databalancing.RebalancePolicy;
import io.confluent.kafka.databalancing.metric.Metrics;
import io.confluent.kafka.databalancing.metric.MetricsCollector;
import io.confluent.kafka.databalancing.throttle.Throttle;
import io.confluent.kafka.databalancing.throttle.Throttler;
import io.confluent.kafka.databalancing.topology.Broker;
import io.confluent.kafka.databalancing.topology.ReplicaAssignment;
import io.confluent.kafka.databalancing.topology.TopologyUtils;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import kafka.admin.AdminUtils;
import kafka.admin.BrokerMetadata;
import kafka.admin.RackAwareMode$Enforced$;
import kafka.common.AdminCommandFailedException;
import kafka.common.TopicAndPartition;
import kafka.controller.ReassignedPartitionsContext;
import kafka.utils.ZkUtils;
import kafka.utils.ZkUtils$;
import org.I0Itec.zkclient.exception.ZkNodeExistsException;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.security.JaasUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.Option;
import scala.collection.JavaConversions;

/* loaded from: input_file:io/confluent/kafka/databalancing/DefaultRebalancer.class */
public class DefaultRebalancer extends AbstractRebalancer {
    private static final Logger logger = LoggerFactory.getLogger(DefaultRebalancer.class);
    private final ZkUtils zkUtils;
    private final RebalancePolicy rebalancePolicy;
    private final Throttle throttle;

    public DefaultRebalancer(String str) {
        this(ZkUtils.apply(str, MetricsCollector.CONSUMER_REPLAY_PERIOD_MS, 7000, JaasUtils.isZkSecurityEnabled()));
    }

    DefaultRebalancer(ZkUtils zkUtils) {
        this(zkUtils, new Throttler(zkUtils));
    }

    DefaultRebalancer(ZkUtils zkUtils, Throttler throttler) {
        this.rebalancePolicy = new MovesOptimisedRebalancePolicy();
        this.zkUtils = zkUtils;
        this.throttle = throttler;
    }

    @Override // io.confluent.kafka.databalancing.Rebalancer
    public ProposedRebalance proposeRebalance(List<Integer> list, RebalancerConfig rebalancerConfig) {
        ReplicaAssignment currentAssignment = currentAssignment();
        List<BrokerMetadata> allBrokers = allBrokers();
        ArrayList arrayList = new ArrayList();
        Iterator<BrokerMetadata> it = allBrokers.iterator();
        while (it.hasNext()) {
            arrayList.add(new Broker(it.next().id()));
        }
        try {
            Metrics metrics = metrics(rebalancerConfig, currentAssignment.topicPartitions(), arrayList);
            List<Broker> brokersToRemove = brokersToRemove(allBrokers, currentAssignment, list);
            RebalancePolicy.Config policyConfig = policyConfig(rebalancerConfig, metrics, arrayList);
            return new ProposedRebalance(allBrokers, currentAssignment, this.rebalancePolicy.rebalancePartitions(allBrokers, currentAssignment, currentAssignment.replicationFactors(), metrics, policyConfig, brokersToRemove), metrics, brokersToRemove, policyConfig);
        } catch (TimeoutException e) {
            throw new AdminCommandFailedException("Failed to retrieve metrics data. " + e.getMessage());
        }
    }

    @Override // io.confluent.kafka.databalancing.Rebalancer
    public void startRebalance(ReplicaAssignment replicaAssignment, ReplicaAssignment replicaAssignment2, long j) {
        this.throttle.engage(j, replicaAssignment, replicaAssignment2);
        try {
            this.zkUtils.createPersistentPath(ZkUtils.ReassignPartitionsPath(), retainChangedAssignments(replicaAssignment, replicaAssignment2).toJson(), this.zkUtils.defaultAcls(ZkUtils.ReassignPartitionsPath()));
        } catch (ZkNodeExistsException e) {
            System.out.println("The throttle rate was updated to " + j + " bytes/sec.");
            throw partitionsBeingReassignedException();
        }
    }

    @Override // io.confluent.kafka.databalancing.Rebalancer
    public boolean maybeUpdateReplicationQuota(ReplicaAssignment replicaAssignment, long j) {
        if (replicaAssignment.isEmpty()) {
            return false;
        }
        this.throttle.engage(j, currentAssignment(), replicaAssignment);
        System.out.println("The throttle rate was updated to " + j + " bytes/sec.");
        System.out.println("A rebalance is currently in progress for:");
        Iterator<String> it = TopologyUtils.partitionsByTopicToLines(replicaAssignment.topicPartitions()).iterator();
        while (it.hasNext()) {
            System.out.println("\t" + it.next());
        }
        return true;
    }

    public ReplicaAssignment retainChangedAssignments(ReplicaAssignment replicaAssignment, ReplicaAssignment replicaAssignment2) {
        HashMap hashMap = new HashMap();
        for (TopicPartition topicPartition : replicaAssignment2.topicPartitions()) {
            List<Integer> replicas = replicaAssignment.replicas(topicPartition);
            List<Integer> replicas2 = replicaAssignment2.replicas(topicPartition);
            if (!replicas.equals(replicas2)) {
                hashMap.put(topicPartition, replicas2);
            }
        }
        return new ReplicaAssignment(hashMap);
    }

    @Override // io.confluent.kafka.databalancing.Rebalancer
    public boolean disengageThrottle() {
        return this.throttle.disengage();
    }

    @Override // io.confluent.kafka.databalancing.Rebalancer
    public RebalanceStatus status() {
        Map mapAsJavaMap = JavaConversions.mapAsJavaMap(this.zkUtils.getPartitionsBeingReassigned());
        HashMap hashMap = new HashMap();
        for (Map.Entry entry : mapAsJavaMap.entrySet()) {
            hashMap.put(new TopicPartition(((TopicAndPartition) entry.getKey()).topic(), ((TopicAndPartition) entry.getKey()).partition()), ScalaUtils.fromScalaSeqInt(((ReassignedPartitionsContext) entry.getValue()).newReplicas()));
        }
        if (hashMap.isEmpty()) {
            disengageThrottle();
        }
        return new RebalanceStatus(new ReplicaAssignment(hashMap));
    }

    private Metrics metrics(RebalancerConfig rebalancerConfig, Collection<TopicPartition> collection, Collection<Broker> collection2) throws TimeoutException {
        return new MetricsCollector(rebalancerConfig.getString(RebalancerConfig.METRICS_TOPIC_CONFIG), rebalancerConfig.getInt(RebalancerConfig.METRICS_COLLECTION_TIMEOUT_MS_CONFIG).intValue()).collectMetrics(rebalancerConfig.consumerProps(), collection, this.zkUtils.getClusterId().isEmpty() ? null : (String) this.zkUtils.getClusterId().get(), collection2);
    }

    private ReplicaAssignment currentAssignment() {
        ReplicaAssignment fromScala = ReplicaAssignment.fromScala(this.zkUtils.getReplicaAssignmentForTopics(this.zkUtils.getAllTopics()));
        HashSet hashSet = new HashSet(JavaConversions.seqAsJavaList(this.zkUtils.getChildrenParentMayNotExist(ZkUtils$.MODULE$.DeleteTopicsPath())));
        if (!hashSet.isEmpty()) {
            logger.info("Ignoring topics marked for deletion: " + Utils.mkString(hashSet, ", "));
        }
        return fromScala.cloneWithoutTopics(hashSet);
    }

    private List<BrokerMetadata> allBrokers() {
        return JavaConversions.seqAsJavaList(AdminUtils.getBrokerMetadatas(this.zkUtils, RackAwareMode$Enforced$.MODULE$, Option.empty()));
    }

    private AdminCommandFailedException partitionsBeingReassignedException() {
        Map mapAsJavaMap = JavaConversions.mapAsJavaMap(this.zkUtils.getPartitionsBeingReassigned());
        HashSet hashSet = new HashSet();
        for (TopicAndPartition topicAndPartition : mapAsJavaMap.keySet()) {
            hashSet.add(new TopicPartition(topicAndPartition.topic(), topicAndPartition.partition()));
        }
        return partitionsBeingReassignedException(hashSet);
    }

    @Override // io.confluent.kafka.databalancing.Rebalancer, java.io.Closeable, java.lang.AutoCloseable
    public void close() {
        try {
            this.zkUtils.close();
        } catch (Throwable th) {
            logger.warn("An exception was thrown while closing zkUtils", th);
        }
    }
}
