package com.linkedin.kafka.cruisecontrol.executor;

import com.linkedin.cruisecontrol.common.utils.Utils;
import com.linkedin.kafka.cruisecontrol.KafkaCruiseControlUtils;
import com.linkedin.kafka.cruisecontrol.config.KafkaCruiseControlConfig;
import com.linkedin.kafka.cruisecontrol.executor.ExecutionTask;
import com.linkedin.kafka.cruisecontrol.monitor.LoadMonitor;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.TreeSet;
import java.util.concurrent.ExecutionException;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import kafka.admin.AdminOperationException;
import kafka.server.KafkaConfig;
import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.clients.admin.AlterConfigOp;
import org.apache.kafka.clients.admin.Config;
import org.apache.kafka.clients.admin.ConfigEntry;
import org.apache.kafka.clients.admin.DescribeConfigsOptions;
import org.apache.kafka.common.KafkaFuture;
import org.apache.kafka.common.config.ConfigResource;
import org.apache.kafka.common.config.internals.ConfluentConfigs;
import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:com/linkedin/kafka/cruisecontrol/executor/ReplicationThrottleHelper.class */
public class ReplicationThrottleHelper {
    private static final Logger LOG;
    static final String LEADER_THROTTLED_RATE;
    static final String FOLLOWER_THROTTLED_RATE;
    static final String LEADER_THROTTLED_REPLICAS;
    static final String FOLLOWER_THROTTLED_REPLICAS;
    private Admin adminClient;
    Long throttleRate;
    private boolean overrideStaticThrottleRate;
    boolean autoThrottleEnabled;
    KafkaCruiseControlConfig config;
    static final /* synthetic */ boolean $assertionsDisabled;

    /* JADX INFO: Access modifiers changed from: package-private */
    public ReplicationThrottleHelper(Admin admin, KafkaCruiseControlConfig kafkaCruiseControlConfig) {
        this.adminClient = admin;
        this.config = kafkaCruiseControlConfig;
        this.throttleRate = this.config.getLong(KafkaCruiseControlConfig.REPLICATION_THROTTLE_CONFIG);
        this.autoThrottleEnabled = this.throttleRate.longValue() == KafkaCruiseControlConfig.AUTO_THROTTLE;
        this.overrideStaticThrottleRate = this.config.getBoolean(KafkaCruiseControlConfig.OVERRIDE_STATIC_THROTTLES_CONFIG).booleanValue();
        LOG.info("Set throttle rate {}. Will " + (this.overrideStaticThrottleRate ? "" : "not") + " override static throttles when setting the rate.", this.throttleRate);
    }

    Long getThrottleRate() {
        return this.autoThrottleEnabled ? Long.valueOf(KafkaCruiseControlConfig.AUTO_THROTTLE) : this.throttleRate;
    }

    public void setThrottleRate(Long l) {
        if (l != null) {
            this.autoThrottleEnabled = l.longValue() == KafkaCruiseControlConfig.AUTO_THROTTLE;
        }
        this.throttleRate = l;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void setThrottles(List<ExecutionProposal> list, LoadMonitor loadMonitor, Set<Integer> set) throws InterruptedException {
        if (!throttlingEnabled()) {
            LOG.info("Skipped setting rebalance throttle because it is not enabled");
            return;
        }
        try {
            doSetThrottles(list, loadMonitor, set);
        } catch (InterruptedException e) {
            LOG.error("Interrupted while setting rebalance throttle.", e);
            throw e;
        } catch (Exception e2) {
            LOG.error("Unexpected exception while setting rebalance throttle.", e2);
            throw new RuntimeException(e2);
        }
    }

    private void doSetThrottles(List<ExecutionProposal> list, LoadMonitor loadMonitor, Set<Integer> set) throws ExecutionException, InterruptedException {
        if (this.autoThrottleEnabled) {
            this.throttleRate = Long.valueOf(loadMonitor.computeThrottle());
        }
        Set<Integer> participatingBrokers = getParticipatingBrokers(list);
        TreeSet treeSet = new TreeSet(participatingBrokers);
        treeSet.removeAll(set);
        if (!set.isEmpty()) {
            LOG.info("Skipping fetching configs for brokers currently being removed: {}", set);
        }
        Map<ConfigResource, Config> fetchBrokerConfigs = fetchBrokerConfigs(treeSet);
        Set<Integer> filterBrokersWithStaticThrottles = filterBrokersWithStaticThrottles(fetchBrokerConfigs);
        Map<String, Set<String>> throttledReplicasByTopic = getThrottledReplicasByTopic(list, filterBrokersWithStaticThrottles);
        LOG.info("Setting a rebalance throttle of {} bytes/sec to {} brokers and {} topics. Following Brokers already have static throttles set: {}", new Object[]{this.throttleRate, Utils.join(participatingBrokers, ", "), Utils.join(throttledReplicasByTopic.keySet(), ", "), Utils.join(filterBrokersWithStaticThrottles, ", ")});
        for (Integer num : treeSet) {
            Config config = fetchBrokerConfigs.get(new ConfigResource(ConfigResource.Type.BROKER, String.valueOf(num)));
            if (config == null) {
                LOG.warn("Skipping setting throttle rates on broker {} as it seems to be offline.", num);
            } else {
                setLeaderThrottledRateIfUnset(num.intValue(), config);
                setFollowerThrottledRateIfUnset(num.intValue(), config);
            }
        }
        throttledReplicasByTopic.forEach(this::setLeaderThrottledReplicas);
        throttledReplicasByTopic.forEach(this::setFollowerThrottledReplicas);
    }

    private Map<ConfigResource, Config> fetchBrokerConfigs(Set<Integer> set) throws ExecutionException, InterruptedException {
        LOG.info("Fetching configs for brokers {}", set);
        List list = (List) set.stream().map(num -> {
            return new ConfigResource(ConfigResource.Type.BROKER, String.valueOf(num));
        }).collect(Collectors.toList());
        HashMap hashMap = new HashMap();
        for (Map.Entry entry : this.adminClient.describeConfigs(list, new DescribeConfigsOptions().timeoutMs(Integer.valueOf(this.config.getInt(KafkaCruiseControlConfig.DESCRIBE_CONFIGS_RESPONSE_TIMEOUT_MS_CONFIG).intValue()))).values().entrySet()) {
            String name = ((ConfigResource) entry.getKey()).name();
            try {
                hashMap.put(entry.getKey(), ((KafkaFuture) entry.getValue()).get());
            } catch (ExecutionException e) {
                if (!(e.getCause() instanceof TimeoutException)) {
                    throw e;
                }
                LOG.warn("Could not fetch broker configs for broker {} when setting replication throttles. This could be because the broker is offline. Ignoring it.", name);
            }
        }
        return hashMap;
    }

    private Set<Integer> filterBrokersWithStaticThrottles(Map<ConfigResource, Config> map) {
        return (Set) map.entrySet().stream().filter(entry -> {
            ConfigEntry configEntry = ((Config) entry.getValue()).get(LEADER_THROTTLED_REPLICAS);
            ConfigEntry configEntry2 = ((Config) entry.getValue()).get(FOLLOWER_THROTTLED_REPLICAS);
            return (configEntry == null || configEntry.value() == null || !configEntry.value().equals("*") || configEntry2 == null || configEntry2.value() == null || !configEntry2.value().equals("*")) ? false : true;
        }).map(entry2 -> {
            return Integer.valueOf(Integer.parseInt(((ConfigResource) entry2.getKey()).name()));
        }).collect(Collectors.toSet());
    }

    boolean shouldRemoveThrottleForTask(ExecutionTask executionTask) {
        return (executionTask.state() == ExecutionTask.State.IN_PROGRESS || executionTask.state() == ExecutionTask.State.PENDING || executionTask.type() != ExecutionTask.TaskType.INTER_BROKER_REPLICA_ACTION) ? false : true;
    }

    boolean taskIsInProgress(ExecutionTask executionTask) {
        return executionTask.state() == ExecutionTask.State.IN_PROGRESS && executionTask.type() == ExecutionTask.TaskType.INTER_BROKER_REPLICA_ACTION;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void clearThrottles(List<ExecutionTask> list, List<ExecutionTask> list2, Set<Integer> set) {
        if (throttlingEnabled()) {
            List<ExecutionProposal> list3 = (List) list.stream().filter(this::shouldRemoveThrottleForTask).map((v0) -> {
                return v0.proposal();
            }).collect(Collectors.toList());
            Set<Integer> participatingBrokers = getParticipatingBrokers(list3);
            participatingBrokers.removeAll(getParticipatingBrokers((List) list2.stream().filter(this::taskIsInProgress).map((v0) -> {
                return v0.proposal();
            }).collect(Collectors.toList())));
            participatingBrokers.removeAll(set);
            LOG.info("Removing replica movement throttles from brokers in the cluster: {}", participatingBrokers);
            participatingBrokers.forEach(this::removeThrottledRateFromBroker);
            getThrottledReplicasByTopic(list3, Collections.emptySet()).forEach(this::removeThrottledReplicasFromTopic);
        }
    }

    private boolean throttlingEnabled() {
        return (this.throttleRate == null || ConfluentConfigs.BALANCER_THROTTLE_NO_THROTTLE.equals(this.throttleRate)) ? false : true;
    }

    private Set<Integer> getParticipatingBrokers(List<ExecutionProposal> list) {
        TreeSet treeSet = new TreeSet();
        for (ExecutionProposal executionProposal : list) {
            treeSet.addAll((Collection) executionProposal.oldReplicas().stream().map((v0) -> {
                return v0.brokerId();
            }).collect(Collectors.toSet()));
            treeSet.addAll((Collection) executionProposal.newReplicas().stream().map((v0) -> {
                return v0.brokerId();
            }).collect(Collectors.toSet()));
        }
        return treeSet;
    }

    private Map<String, Set<String>> getThrottledReplicasByTopic(List<ExecutionProposal> list, Set<Integer> set) {
        HashMap hashMap = new HashMap();
        for (ExecutionProposal executionProposal : list) {
            String str = executionProposal.topic();
            int partitionId = executionProposal.partitionId();
            List list2 = (List) Stream.concat(executionProposal.oldReplicas().stream().map((v0) -> {
                return v0.brokerId();
            }), executionProposal.replicasToAdd().stream().map((v0) -> {
                return v0.brokerId();
            })).collect(Collectors.toList());
            if (!set.containsAll(list2)) {
                Set set2 = (Set) hashMap.computeIfAbsent(str, str2 -> {
                    return new TreeSet();
                });
                list2.forEach(num -> {
                    set2.add(partitionId + ":" + num);
                });
            }
        }
        return hashMap;
    }

    private void setLeaderThrottledRateIfUnset(int i, Config config) {
        setThrottledRateIfUnset(i, config, LEADER_THROTTLED_RATE);
    }

    private void setFollowerThrottledRateIfUnset(int i, Config config) {
        setThrottledRateIfUnset(i, config, FOLLOWER_THROTTLED_RATE);
    }

    private void setThrottledRateIfUnset(int i, Config config, String str) {
        ConfigEntry configEntry;
        if (!$assertionsDisabled && this.throttleRate == null) {
            throw new AssertionError();
        }
        if (!$assertionsDisabled && !str.equals(LEADER_THROTTLED_RATE) && !str.equals(FOLLOWER_THROTTLED_RATE)) {
            throw new AssertionError();
        }
        if (!this.overrideStaticThrottleRate && (configEntry = config.get(str)) != null && configEntry.source() == ConfigEntry.ConfigSource.STATIC_BROKER_CONFIG && configEntry.value() != null && !configEntry.value().isEmpty()) {
            LOG.debug("Not setting {} for broker {} because pre-existing throttle of {} was already set statically.", new Object[]{str, Integer.valueOf(i), configEntry.value()});
            return;
        }
        Optional<ConfigEntry> configEntry2 = KafkaCruiseControlUtils.getConfigEntry(config, str);
        if (configEntry2.isPresent() && !configEntry2.get().isDefault()) {
            LOG.debug("Not setting {} value {} for broker {} because pre-existing throttle of {} was already set", new Object[]{str, this.throttleRate, Integer.valueOf(i), configEntry2.get()});
        } else {
            LOG.debug("Setting {} to {} bytes/second for broker {}", new Object[]{str, this.throttleRate, Integer.valueOf(i)});
            KafkaCruiseControlUtils.setEntityConfigs(this.adminClient, ConfigResource.Type.BROKER, Collections.singleton(String.valueOf(i)), AlterConfigOp.OpType.SET, Collections.singletonMap(str, String.valueOf(this.throttleRate)));
        }
    }

    private void setLeaderThrottledReplicas(String str, Set<String> set) {
        setThrottledReplicas(str, set, LEADER_THROTTLED_REPLICAS);
    }

    private void setFollowerThrottledReplicas(String str, Set<String> set) {
        setThrottledReplicas(str, set, FOLLOWER_THROTTLED_REPLICAS);
    }

    private void setThrottledReplicas(String str, Set<String> set, String str2) {
        if (!$assertionsDisabled && !str2.equals(LEADER_THROTTLED_REPLICAS) && !str2.equals(FOLLOWER_THROTTLED_REPLICAS)) {
            throw new AssertionError();
        }
        String join = String.join(",", set);
        try {
            KafkaCruiseControlUtils.setEntityConfigs(this.adminClient, ConfigResource.Type.TOPIC, Collections.singleton(str), AlterConfigOp.OpType.APPEND, Collections.singletonMap(str2, join));
        } catch (AdminOperationException | UnknownTopicOrPartitionException e) {
            LOG.info("Not setting throttled replicas {} for topic {} because it does not exist", join, str);
        }
    }

    static String removeReplicasFromConfig(String str, Set<String> set) {
        ArrayList arrayList = new ArrayList(Arrays.asList(str.split(",")));
        set.getClass();
        arrayList.removeIf((v1) -> {
            return r1.contains(v1);
        });
        return String.join(",", arrayList);
    }

    private Optional<String> removeLeaderThrottledReplicasFromTopic(Config config, String str, Set<String> set) {
        Optional<ConfigEntry> configEntry = KafkaCruiseControlUtils.getConfigEntry(config, LEADER_THROTTLED_REPLICAS);
        if (!configEntry.isPresent() || configEntry.get().isDefault()) {
            return Optional.empty();
        }
        LOG.debug("Removing leader throttles for topic {} for replicas {}", str, set);
        return Optional.of(removeReplicasFromConfig(configEntry.get().value(), set));
    }

    private Optional<String> removeFollowerThrottledReplicasFromTopic(Config config, String str, Set<String> set) {
        Optional<ConfigEntry> configEntry = KafkaCruiseControlUtils.getConfigEntry(config, FOLLOWER_THROTTLED_REPLICAS);
        if (!configEntry.isPresent() || configEntry.get().isDefault()) {
            return Optional.empty();
        }
        LOG.debug("Removing follower throttles for topic {} and replicas {}", str, set);
        return Optional.of(removeReplicasFromConfig(configEntry.get().value(), set));
    }

    private void removeThrottledReplicasFromTopic(String str, Set<String> set) {
        try {
            Config entityConfigs = KafkaCruiseControlUtils.getEntityConfigs(this.adminClient, ConfigResource.Type.TOPIC, str);
            ArrayList arrayList = new ArrayList(2);
            removeLeaderThrottledReplicasFromTopic(entityConfigs, str, set).ifPresent(str2 -> {
                arrayList.add(new AlterConfigOp(new ConfigEntry(LEADER_THROTTLED_REPLICAS, str2), str2.isEmpty() ? AlterConfigOp.OpType.DELETE : AlterConfigOp.OpType.SET));
            });
            removeFollowerThrottledReplicasFromTopic(entityConfigs, str, set).ifPresent(str3 -> {
                arrayList.add(new AlterConfigOp(new ConfigEntry(FOLLOWER_THROTTLED_REPLICAS, str3), str3.isEmpty() ? AlterConfigOp.OpType.DELETE : AlterConfigOp.OpType.SET));
            });
            KafkaCruiseControlUtils.setEntityConfigs(this.adminClient, ConfigResource.Type.TOPIC, Collections.singleton(str), arrayList);
        } catch (AdminOperationException | UnknownTopicOrPartitionException e) {
            LOG.warn("Skip removing throttled replicas {} for topic {} due to error {}", new Object[]{set, str, e});
        }
    }

    private void removeThrottledRateFromBroker(Integer num) {
        if (num == null) {
            throw new RuntimeException("Cannot remove throttle for null broker id.");
        }
        LOG.debug("Removing leader and follower throttle on broker {}", num);
        HashMap hashMap = new HashMap(2);
        hashMap.put(LEADER_THROTTLED_RATE, null);
        hashMap.put(FOLLOWER_THROTTLED_RATE, null);
        KafkaCruiseControlUtils.setEntityConfigs(this.adminClient, ConfigResource.Type.BROKER, Collections.singleton(String.valueOf(num)), AlterConfigOp.OpType.DELETE, hashMap);
    }

    public int updateOrRemoveBrokerThrottleRate(Long l) {
        String l2 = l == null ? null : Long.toString(l.longValue());
        LinkedList linkedList = new LinkedList();
        KafkaCruiseControlUtils.getEntityConfigs(this.adminClient, ConfigResource.Type.BROKER, (Collection<String>) KafkaCruiseControlUtils.getAllBrokersInCluster(this.adminClient).stream().map((v0) -> {
            return String.valueOf(v0);
        }).collect(Collectors.toList())).forEach((configResource, config) -> {
            boolean z;
            boolean z2;
            Optional<ConfigEntry> configEntry = KafkaCruiseControlUtils.getConfigEntry(config, LEADER_THROTTLED_RATE);
            Optional<ConfigEntry> configEntry2 = KafkaCruiseControlUtils.getConfigEntry(config, FOLLOWER_THROTTLED_RATE);
            if (l == null) {
                z = (!configEntry.isPresent() || configEntry.get().isDefault() || configEntry.get().value() == null || configEntry.get().value().trim().isEmpty()) ? false : true;
                z2 = (!configEntry2.isPresent() || configEntry2.get().isDefault() || configEntry2.get().value() == null || configEntry2.get().value().trim().isEmpty()) ? false : true;
            } else {
                Optional map = configEntry.map((v0) -> {
                    return v0.value();
                }).map(str -> {
                    return Boolean.valueOf(!str.equals(l2));
                });
                z = !map.isPresent() || ((Boolean) map.get()).booleanValue();
                Optional map2 = configEntry2.map((v0) -> {
                    return v0.value();
                }).map(str2 -> {
                    return Boolean.valueOf(!str2.equals(l2));
                });
                z2 = !map2.isPresent() || ((Boolean) map2.get()).booleanValue();
            }
            if (!z && !z2) {
                LOG.debug("Not setting throttle {} for broker {} because pre-existing throttle is same. Leader rate: {}, follower rate:{}", new Object[]{l, configResource, configEntry, configEntry2});
            } else {
                LOG.debug("Setting throttle to {} bytes/second for broker {}", l, configResource);
                linkedList.add(configResource.name());
            }
        });
        if (!linkedList.isEmpty()) {
            HashMap hashMap = new HashMap(2);
            hashMap.put(LEADER_THROTTLED_RATE, l2);
            hashMap.put(FOLLOWER_THROTTLED_RATE, l2);
            KafkaCruiseControlUtils.setEntityConfigs(this.adminClient, ConfigResource.Type.BROKER, linkedList, l == null ? AlterConfigOp.OpType.DELETE : AlterConfigOp.OpType.SET, hashMap);
            LOG.debug("Setting throttle to {} bytes/second for brokers {}", hashMap, linkedList);
        }
        return linkedList.size();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void removeAllThrottles() {
        Collection<String> allTopicsInCluster = KafkaCruiseControlUtils.getAllTopicsInCluster(this.adminClient);
        Map<ConfigResource, Config> entityConfigs = KafkaCruiseControlUtils.getEntityConfigs(this.adminClient, ConfigResource.Type.TOPIC, allTopicsInCluster);
        LinkedList linkedList = new LinkedList();
        entityConfigs.forEach((configResource, config) -> {
            Optional<ConfigEntry> configEntry = KafkaCruiseControlUtils.getConfigEntry(config, LEADER_THROTTLED_REPLICAS);
            Optional<ConfigEntry> configEntry2 = KafkaCruiseControlUtils.getConfigEntry(config, FOLLOWER_THROTTLED_REPLICAS);
            if ((!configEntry.isPresent() || configEntry.get().isDefault()) && (!configEntry2.isPresent() || configEntry2.get().isDefault())) {
                return;
            }
            linkedList.add(configResource.name());
        });
        if (!linkedList.isEmpty()) {
            HashMap hashMap = new HashMap(2);
            hashMap.put(LEADER_THROTTLED_REPLICAS, null);
            hashMap.put(FOLLOWER_THROTTLED_REPLICAS, null);
            KafkaCruiseControlUtils.setEntityConfigs(this.adminClient, ConfigResource.Type.TOPIC, linkedList, AlterConfigOp.OpType.DELETE, hashMap);
        }
        LOG.info("Removed throttled replicas config for topics: {}", allTopicsInCluster);
        LOG.info("Removed throttle rate config from {} brokers", Integer.valueOf(updateOrRemoveBrokerThrottleRate(null)));
    }

    static {
        $assertionsDisabled = !ReplicationThrottleHelper.class.desiredAssertionStatus();
        LOG = LoggerFactory.getLogger(ReplicationThrottleHelper.class);
        LEADER_THROTTLED_RATE = KafkaConfig.LeaderReplicationThrottledRateProp();
        FOLLOWER_THROTTLED_RATE = KafkaConfig.FollowerReplicationThrottledRateProp();
        LEADER_THROTTLED_REPLICAS = KafkaConfig.LeaderReplicationThrottledReplicasProp();
        FOLLOWER_THROTTLED_REPLICAS = KafkaConfig.FollowerReplicationThrottledReplicasProp();
    }
}
