package io.confluent.kafka.databalancing;

import com.google.common.base.Strings;
import io.confluent.kafka.databalancing.RebalancePolicy;
import io.confluent.kafka.databalancing.metric.Metrics;
import io.confluent.kafka.databalancing.metric.MetricsCollector;
import io.confluent.kafka.databalancing.throttle.Throttle;
import io.confluent.kafka.databalancing.throttle.Throttler;
import io.confluent.kafka.databalancing.topology.Broker;
import io.confluent.kafka.databalancing.topology.BrokerMetadata;
import io.confluent.kafka.databalancing.topology.ClusterAssignment;
import io.confluent.kafka.databalancing.topology.ClusterReassignment;
import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;
import kafka.common.AdminCommandFailedException;
import kafka.common.TopicPlacement;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.TimeoutException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/confluent/kafka/databalancing/DefaultRebalancer.class */
public class DefaultRebalancer extends AbstractRebalancer {
    private static final Logger logger = LoggerFactory.getLogger(DefaultRebalancer.class);
    protected final RebalancerAdmin rebalancerAdmin;
    protected final Throttle throttle;
    private final MetricsCollector metricsCollector;
    private final RebalancePolicy rebalancePolicy;

    public DefaultRebalancer(RebalancerConfig rebalancerConfig, RebalancerAdmin rebalancerAdmin) {
        this(rebalancerConfig, rebalancerAdmin, new MetricsCollector(rebalancerConfig), new Throttler(rebalancerAdmin));
    }

    public DefaultRebalancer(RebalancerConfig rebalancerConfig, RebalancerAdmin rebalancerAdmin, MetricsCollector metricsCollector, Throttle throttle) {
        super(rebalancerConfig);
        this.rebalancePolicy = new MovesOptimisedRebalancePolicy();
        this.rebalancerAdmin = rebalancerAdmin;
        this.metricsCollector = metricsCollector;
        this.throttle = throttle;
    }

    private Map<Broker, BrokerMetadata> allBrokers(ClusterAssignment clusterAssignment) {
        Map<Broker, BrokerMetadata> brokersMap = Utils.brokersMap(this.rebalancerAdmin.getAllBrokerMetadata());
        Iterator<Broker> it = clusterAssignment.brokers().iterator();
        while (it.hasNext()) {
            brokersMap.computeIfAbsent(it.next(), broker -> {
                return BrokerMetadata.offline(broker.id());
            });
        }
        return brokersMap;
    }

    @Override // io.confluent.kafka.databalancing.Rebalancer
    public ProposedRebalance proposeRebalance(CommandContext commandContext) {
        Optional<Set<String>> empty = commandContext.topicsToRebalance.isEmpty() ? Optional.empty() : Optional.of(commandContext.topicsToRebalance);
        ClusterAssignment currentAssignment = currentAssignment(empty, commandContext.excludeInternalTopics);
        Map<String, TopicPlacement> topicPlacementMap = getTopicPlacementMap(empty);
        Map<Broker, BrokerMetadata> allBrokers = allBrokers(currentAssignment);
        List<Broker> brokersToRemove = brokersToRemove(allBrokers, currentAssignment, commandContext);
        Metrics metrics = metrics(currentAssignment.topicPartitions(), (Set) allBrokers.entrySet().stream().filter(entry -> {
            return !((BrokerMetadata) entry.getValue()).isOffline();
        }).map((v0) -> {
            return v0.getKey();
        }).collect(Collectors.toSet()));
        RebalancePolicy.Config policyConfig = policyConfig(metrics, allBrokers);
        MutableRebalanceContext create = DefaultRebalanceContext.create(allBrokers, currentAssignment, currentAssignment.replicationFactors(), topicPlacementMap, metrics, policyConfig, brokersToRemove);
        return new ProposedRebalance(allBrokers, topicPlacementMap, currentAssignment, commandContext.replicaPlacementOnly ? this.rebalancePolicy.enforcePlacementConstraints(create) : this.rebalancePolicy.rebalancePartitions(create), metrics, brokersToRemove, policyConfig);
    }

    @Override // io.confluent.kafka.databalancing.Rebalancer
    public void startRebalance(ProposedRebalance proposedRebalance, long j) {
        this.throttle.engage(j, proposedRebalance.aliveBrokers(), proposedRebalance.reassignment());
        createPartitionAssignment(proposedRebalance);
    }

    @Override // io.confluent.kafka.databalancing.Rebalancer
    public void cancelRebalance() {
        ClusterReassignment currentReassignment = currentReassignment();
        if (currentReassignment.topicPartitions().isEmpty()) {
            return;
        }
        this.rebalancerAdmin.cancelPartitionReassignment(currentReassignment.topicPartitions());
    }

    private void createPartitionAssignment(ProposedRebalance proposedRebalance) {
        this.rebalancerAdmin.createPartitionReassignment(proposedRebalance.proposedAssignmentChanges().asNewPartitionReassignmentMap());
    }

    @Override // io.confluent.kafka.databalancing.Rebalancer
    public boolean maybeUpdateReplicationQuota(long j, boolean z) {
        ClusterReassignment currentReassignment = currentReassignment();
        if (currentReassignment.isEmpty()) {
            return false;
        }
        this.throttle.engage(j, this.rebalancerAdmin.getAllBrokersInCluster(), currentReassignment);
        System.out.println("The throttle rate was updated to " + j + " bytes/sec.");
        System.out.println("A rebalance is currently in progress for:");
        currentReassignment.printPartitionsByTopic();
        return true;
    }

    @Override // io.confluent.kafka.databalancing.Rebalancer
    public boolean disengageThrottle() {
        return this.throttle.disengage();
    }

    @Override // io.confluent.kafka.databalancing.Rebalancer
    public ClusterReassignment currentReassignment() {
        ClusterReassignment currentReassignment = this.rebalancerAdmin.currentReassignment();
        if (currentReassignment.isEmpty()) {
            disengageThrottle();
        }
        return currentReassignment;
    }

    private Metrics metrics(Set<TopicPartition> set, Set<Broker> set2) {
        try {
            return this.metricsCollector.collectMetrics(set, this.rebalancerAdmin.getClusterId(), set2);
        } catch (TimeoutException e) {
            throw new AdminCommandFailedException("Failed to retrieve metrics data. " + e.getMessage());
        }
    }

    ClusterAssignment currentAssignment(Optional<Set<String>> optional, boolean z) {
        ClusterAssignment currentAssignment = this.rebalancerAdmin.currentAssignment(optional, z);
        Set<String> allDeletedTopicsInCluster = this.rebalancerAdmin.getAllDeletedTopicsInCluster();
        if (!allDeletedTopicsInCluster.isEmpty()) {
            logger.info("Ignoring topics marked for deletion: " + Utils.mkString(allDeletedTopicsInCluster, ", "));
        }
        return currentAssignment.cloneWithoutTopics(allDeletedTopicsInCluster);
    }

    public static boolean isInternalTopic(String str, Collection<String> collection) {
        String nullToEmpty = Strings.nullToEmpty(str);
        return !collection.contains(nullToEmpty) || nullToEmpty.startsWith("_confluent");
    }

    private Map<String, TopicPlacement> getTopicPlacementMap(Optional<Set<String>> optional) {
        Map<String, Map<String, Optional<String>>> fetchTopicConfigs = this.rebalancerAdmin.fetchTopicConfigs(optional.orElse(this.rebalancerAdmin.getAllTopicsInCluster(true)));
        HashMap hashMap = new HashMap(fetchTopicConfigs.size());
        for (Map.Entry<String, Map<String, Optional<String>>> entry : fetchTopicConfigs.entrySet()) {
            entry.getValue().getOrDefault("confluent.placement.constraints", Optional.empty()).flatMap(TopicPlacement::parse).ifPresent(topicPlacement -> {
            });
        }
        return hashMap;
    }

    @Override // io.confluent.kafka.databalancing.Rebalancer, java.lang.AutoCloseable
    public void close() {
        try {
            this.rebalancerAdmin.close();
        } catch (Throwable th) {
            logger.warn("An exception was thrown while closing rebalancerAdmin", th);
        }
    }
}
