package io.confluent.kafka.databalancing.throttle;

import io.confluent.kafka.databalancing.topology.Broker;
import io.confluent.kafka.databalancing.topology.ReplicaAssignment;
import java.util.Arrays;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import kafka.admin.AdminUtilities;
import kafka.log.LogConfig;
import kafka.server.ConfigType;
import kafka.server.DynamicConfig$Broker$;
import kafka.utils.ZkUtils;
import org.apache.kafka.common.TopicPartition;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.collection.JavaConversions;
import scala.collection.Seq;

/* loaded from: input_file:io/confluent/kafka/databalancing/throttle/Throttler.class */
public class Throttler implements Throttle {
    private static final Logger logger = LoggerFactory.getLogger(Throttler.class);
    static final String LEADER_RATE_PROP = DynamicConfig$Broker$.MODULE$.LeaderReplicationThrottledRateProp();
    static final String FOLLOWER_RATE_PROP = DynamicConfig$Broker$.MODULE$.FollowerReplicationThrottledRateProp();
    static final String LEADER_REPLICAS_PROP = LogConfig.LeaderReplicationThrottledReplicasProp();
    static final String FOLLOWER_REPLICAS_PROP = LogConfig.FollowerReplicationThrottledReplicasProp();
    private final ZkUtils zkUtils;
    private final AdminUtilities admin;

    public Throttler(ZkUtils zkUtils) {
        this(zkUtils, new KafkaAdmin());
    }

    public Throttler(ZkUtils zkUtils, AdminUtilities adminUtilities) {
        this.zkUtils = zkUtils;
        this.admin = adminUtilities;
    }

    @Override // io.confluent.kafka.databalancing.throttle.Throttle
    public void engage(long j, ReplicaAssignment replicaAssignment, ReplicaAssignment replicaAssignment2) {
        AssignmentPair assignmentPair = new AssignmentPair(replicaAssignment, replicaAssignment2);
        if (this.zkUtils.pathExists(ZkUtils.ReassignPartitionsPath())) {
            limit(j, assignmentPair.brokers());
        } else {
            disengage();
            engage(j, assignmentPair);
        }
    }

    void engage(long j, AssignmentPair assignmentPair) {
        limit(j, assignmentPair.brokers());
        throttleReplicas(assignmentPair);
    }

    @Override // io.confluent.kafka.databalancing.throttle.Throttle
    public boolean disengage() {
        return removeReplicas() | removeRates();
    }

    private void limit(long j, Collection<Broker> collection) {
        for (Broker broker : collection) {
            Properties copyOf = copyOf(this.admin.fetchEntityConfig(this.zkUtils, ConfigType.Broker(), String.valueOf(broker.id())));
            boolean updateIfChanged = updateIfChanged(String.valueOf(j), copyOf, LEADER_RATE_PROP);
            boolean updateIfChanged2 = updateIfChanged(String.valueOf(j), copyOf, FOLLOWER_RATE_PROP);
            if (updateIfChanged || updateIfChanged2) {
                this.admin.changeBrokerConfig(this.zkUtils, toSeq(broker.id()), copyOf);
                logger.info("Updated broker config for broker [{}] to be [{}]", broker, copyOf);
            }
        }
    }

    private void throttleReplicas(AssignmentPair assignmentPair) {
        for (String str : assignmentPair.topics()) {
            String format = format(assignmentPair.moveSources(str));
            String format2 = format(assignmentPair.moveDestinations(str));
            Properties copyOf = copyOf(this.admin.fetchEntityConfig(this.zkUtils, ConfigType.Topic(), str));
            boolean updateIfChanged = updateIfChanged(format, copyOf, LEADER_REPLICAS_PROP);
            boolean updateIfChanged2 = updateIfChanged(format2, copyOf, FOLLOWER_REPLICAS_PROP);
            if (updateIfChanged || updateIfChanged2) {
                this.admin.changeTopicConfig(this.zkUtils, str, copyOf);
                logger.info("Updated topic config for topic [{}] to be [{}]", str, copyOf);
            }
        }
    }

    private boolean removeReplicas() {
        boolean z = false;
        for (String str : JavaConversions.asJavaCollection(this.zkUtils.getAllTopics())) {
            Properties copyOf = copyOf(this.admin.fetchEntityConfig(this.zkUtils, ConfigType.Topic(), str));
            boolean z2 = copyOf.remove(LEADER_REPLICAS_PROP) != null;
            boolean z3 = copyOf.remove(FOLLOWER_REPLICAS_PROP) != null;
            if (z2 || z3) {
                this.admin.changeTopicConfig(this.zkUtils, str, copyOf);
                z = true;
            }
        }
        return z;
    }

    private boolean removeRates() {
        boolean z = false;
        for (kafka.cluster.Broker broker : JavaConversions.asJavaCollection(this.zkUtils.getAllBrokersInCluster())) {
            Properties copyOf = copyOf(this.admin.fetchEntityConfig(this.zkUtils, ConfigType.Broker(), String.valueOf(broker.id())));
            boolean z2 = copyOf.remove(LEADER_RATE_PROP) != null;
            boolean z3 = copyOf.remove(FOLLOWER_RATE_PROP) != null;
            if (z2 || z3) {
                this.admin.changeBrokerConfig(this.zkUtils, toSeq(broker.id()), copyOf);
                z = true;
            }
        }
        return z;
    }

    private boolean updateIfChanged(String str, Properties properties, String str2) {
        String property = properties.getProperty(str2);
        boolean z = str == null ? property != null : !str.equals(property);
        if (z) {
            properties.put(str2, str);
        }
        return z;
    }

    private String format(Map<TopicPartition, List<Integer>> map) {
        StringBuilder sb = new StringBuilder();
        for (Map.Entry<TopicPartition, List<Integer>> entry : map.entrySet()) {
            TopicPartition key = entry.getKey();
            Iterator<Integer> it = entry.getValue().iterator();
            while (it.hasNext()) {
                sb.append(String.format("%s:%s,", Integer.valueOf(key.partition()), Integer.valueOf(it.next().intValue())));
            }
        }
        if (sb.length() > 0) {
            sb.deleteCharAt(sb.length() - 1);
        }
        return sb.toString();
    }

    private Properties copyOf(Properties properties) {
        Properties properties2 = new Properties();
        properties2.putAll(properties);
        return properties2;
    }

    private Seq<Object> toSeq(int i) {
        return JavaConversions.asScalaBuffer(Arrays.asList(Integer.valueOf(i))).toSeq();
    }
}
