package io.confluent.kafka.databalancing.throttle;

import io.confluent.kafka.databalancing.AlterTopicConfigsEntry;
import io.confluent.kafka.databalancing.RebalancerAdmin;
import io.confluent.kafka.databalancing.exception.ValidationException;
import io.confluent.kafka.databalancing.topology.Broker;
import io.confluent.kafka.databalancing.topology.ClusterReassignment;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.SortedMap;
import java.util.SortedSet;
import java.util.TreeMap;
import java.util.TreeSet;
import java.util.function.BiFunction;
import java.util.stream.Collectors;
import kafka.server.KafkaConfig;
import org.apache.kafka.common.TopicPartition;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/confluent/kafka/databalancing/throttle/Throttler.class */
public class Throttler implements Throttle {
    private static final Logger logger = LoggerFactory.getLogger(Throttler.class);
    static final String LEADER_RATE_PROP = KafkaConfig.LeaderReplicationThrottledRateProp();
    static final String FOLLOWER_RATE_PROP = KafkaConfig.FollowerReplicationThrottledRateProp();
    static final String LEADER_REPLICAS_PROP = KafkaConfig.LeaderReplicationThrottledReplicasProp();
    static final String FOLLOWER_REPLICAS_PROP = KafkaConfig.FollowerReplicationThrottledReplicasProp();
    private final RebalancerAdmin admin;

    public Throttler(RebalancerAdmin rebalancerAdmin) {
        this.admin = rebalancerAdmin;
    }

    @Override // io.confluent.kafka.databalancing.throttle.Throttle
    public void engage(long j, Set<Integer> set, ClusterReassignment clusterReassignment) {
        if (this.admin.reassignPartitionsInProgress()) {
            limit(j, (Collection) clusterReassignment.brokers().stream().filter(broker -> {
                return set.contains(Integer.valueOf(broker.id()));
            }).collect(Collectors.toList()));
        } else {
            disengage();
            limitAndThrottle(j, set, clusterReassignment);
        }
    }

    void limitAndThrottle(long j, Set<Integer> set, ClusterReassignment clusterReassignment) {
        limit(j, (Collection) clusterReassignment.brokers().stream().filter(broker -> {
            return set.contains(Integer.valueOf(broker.id()));
        }).collect(Collectors.toList()));
        updateThrottledReplicas(clusterReassignment, this::replace);
    }

    @Override // io.confluent.kafka.databalancing.throttle.Throttle
    public boolean disengage() {
        return removeReplicas() | removeRates();
    }

    @Override // io.confluent.kafka.databalancing.throttle.Throttle
    public void limit(long j, Collection<Broker> collection) {
        List list = (List) collection.stream().map((v0) -> {
            return v0.id();
        }).collect(Collectors.toList());
        String valueOf = String.valueOf(j);
        Map<Integer, Map<String, Optional<String>>> fetchBrokerConfigs = this.admin.fetchBrokerConfigs(list);
        fetchBrokerConfigs.values().removeIf(map -> {
            return (needsUpdate(map, LEADER_RATE_PROP, valueOf) || needsUpdate(map, FOLLOWER_RATE_PROP, valueOf)) ? false : true;
        });
        if (fetchBrokerConfigs.isEmpty()) {
            return;
        }
        HashMap hashMap = new HashMap(2);
        hashMap.put(LEADER_RATE_PROP, valueOf);
        hashMap.put(FOLLOWER_RATE_PROP, valueOf);
        this.admin.alterBrokerConfigs(fetchBrokerConfigs, hashMap);
        Iterator<Integer> it = fetchBrokerConfigs.keySet().iterator();
        while (it.hasNext()) {
            logger.info("Updated broker config for broker [{}] to set [{}={}, {}={}]", new Object[]{it.next(), LEADER_RATE_PROP, valueOf, FOLLOWER_RATE_PROP, valueOf});
        }
    }

    @Override // io.confluent.kafka.databalancing.throttle.Throttle
    public void throttleReplicas(ClusterReassignment clusterReassignment) {
        updateThrottledReplicas(clusterReassignment, this::union);
    }

    @Override // io.confluent.kafka.databalancing.throttle.Throttle
    public void unthrottleReplicas(ClusterReassignment clusterReassignment) {
        updateThrottledReplicas(clusterReassignment, this::diff);
    }

    private void updateThrottledReplicas(ClusterReassignment clusterReassignment, BiFunction<Map<TopicPartition, SortedSet<Integer>>, Map<TopicPartition, SortedSet<Integer>>, SortedMap<TopicPartition, SortedSet<Integer>>> biFunction) {
        TreeMap treeMap = new TreeMap(this.admin.fetchTopicConfigs(clusterReassignment.topics()));
        ArrayList arrayList = new ArrayList();
        for (Map.Entry entry : treeMap.entrySet()) {
            String str = (String) entry.getKey();
            Map map = (Map) entry.getValue();
            String str2 = (String) ((Optional) map.getOrDefault(LEADER_REPLICAS_PROP, Optional.empty())).orElse(null);
            String updatedThrottledReplicasConfig = updatedThrottledReplicasConfig(str, str2, clusterReassignment.moveSources(str), biFunction);
            String str3 = (String) ((Optional) map.getOrDefault(FOLLOWER_REPLICAS_PROP, Optional.empty())).orElse(null);
            String updatedThrottledReplicasConfig2 = updatedThrottledReplicasConfig(str, str3, clusterReassignment.moveDestinations(str), biFunction);
            HashMap hashMap = new HashMap(2);
            if (!Objects.equals(str2, updatedThrottledReplicasConfig)) {
                hashMap.put(LEADER_REPLICAS_PROP, updatedThrottledReplicasConfig.isEmpty() ? null : updatedThrottledReplicasConfig);
            }
            if (!Objects.equals(str3, updatedThrottledReplicasConfig2)) {
                hashMap.put(FOLLOWER_REPLICAS_PROP, updatedThrottledReplicasConfig2.isEmpty() ? null : updatedThrottledReplicasConfig2);
            }
            if (!hashMap.isEmpty()) {
                arrayList.add(new AlterTopicConfigsEntry(str, map, hashMap));
            }
        }
        if (arrayList.isEmpty()) {
            return;
        }
        this.admin.alterTopicConfigs(arrayList);
    }

    private String updatedThrottledReplicasConfig(String str, String str2, Map<TopicPartition, SortedSet<Integer>> map, BiFunction<Map<TopicPartition, SortedSet<Integer>>, Map<TopicPartition, SortedSet<Integer>>, SortedMap<TopicPartition, SortedSet<Integer>>> biFunction) {
        return format(biFunction.apply(parseThrottledReplicasConfig(str, str2), map));
    }

    private SortedMap<TopicPartition, SortedSet<Integer>> diff(Map<TopicPartition, SortedSet<Integer>> map, Map<TopicPartition, SortedSet<Integer>> map2) {
        TreeMap treeMap = new TreeMap(topicPartitionComparator());
        treeMap.putAll(map);
        for (Map.Entry<TopicPartition, SortedSet<Integer>> entry : map2.entrySet()) {
            ((SortedSet) treeMap.computeIfAbsent(entry.getKey(), topicPartition -> {
                return new TreeSet();
            })).removeAll(entry.getValue());
        }
        return treeMap;
    }

    private SortedMap<TopicPartition, SortedSet<Integer>> union(Map<TopicPartition, SortedSet<Integer>> map, Map<TopicPartition, SortedSet<Integer>> map2) {
        TreeMap treeMap = new TreeMap(topicPartitionComparator());
        treeMap.putAll(map);
        for (Map.Entry<TopicPartition, SortedSet<Integer>> entry : map2.entrySet()) {
            ((SortedSet) treeMap.computeIfAbsent(entry.getKey(), topicPartition -> {
                return new TreeSet();
            })).addAll(entry.getValue());
        }
        return treeMap;
    }

    private SortedMap<TopicPartition, SortedSet<Integer>> replace(Map<TopicPartition, SortedSet<Integer>> map, Map<TopicPartition, SortedSet<Integer>> map2) {
        TreeMap treeMap = new TreeMap(topicPartitionComparator());
        treeMap.putAll(map2);
        return treeMap;
    }

    private boolean removeReplicas() {
        Map<String, Map<String, Optional<String>>> fetchTopicConfigs = this.admin.fetchTopicConfigs(this.admin.getAllTopicsInCluster(true));
        ArrayList arrayList = new ArrayList();
        for (Map.Entry<String, Map<String, Optional<String>>> entry : fetchTopicConfigs.entrySet()) {
            Map<String, Optional<String>> value = entry.getValue();
            if (needsRemoval(value, LEADER_REPLICAS_PROP) || needsRemoval(value, FOLLOWER_REPLICAS_PROP)) {
                HashMap hashMap = new HashMap(2);
                hashMap.put(LEADER_REPLICAS_PROP, null);
                hashMap.put(FOLLOWER_REPLICAS_PROP, null);
                arrayList.add(new AlterTopicConfigsEntry(entry.getKey(), value, hashMap));
            }
        }
        if (arrayList.isEmpty()) {
            return false;
        }
        this.admin.alterTopicConfigs(arrayList);
        return true;
    }

    private boolean removeRates() {
        Map<Integer, Map<String, Optional<String>>> fetchBrokerConfigs = this.admin.fetchBrokerConfigs(this.admin.getAllBrokersInCluster());
        fetchBrokerConfigs.values().removeIf(map -> {
            return (needsRemoval(map, LEADER_RATE_PROP) || needsRemoval(map, FOLLOWER_RATE_PROP)) ? false : true;
        });
        if (fetchBrokerConfigs.isEmpty()) {
            return false;
        }
        HashMap hashMap = new HashMap(2);
        hashMap.put(LEADER_RATE_PROP, null);
        hashMap.put(FOLLOWER_RATE_PROP, null);
        this.admin.alterBrokerConfigs(fetchBrokerConfigs, hashMap);
        Iterator<Integer> it = fetchBrokerConfigs.keySet().iterator();
        while (it.hasNext()) {
            logger.info("Updated broker config for broker [{}] to remove [{}, {}]", new Object[]{it.next(), LEADER_RATE_PROP, FOLLOWER_RATE_PROP});
        }
        return true;
    }

    private String format(SortedMap<TopicPartition, SortedSet<Integer>> sortedMap) {
        StringBuilder sb = new StringBuilder();
        for (Map.Entry<TopicPartition, SortedSet<Integer>> entry : sortedMap.entrySet()) {
            TopicPartition key = entry.getKey();
            Iterator<Integer> it = entry.getValue().iterator();
            while (it.hasNext()) {
                sb.append(String.format("%s:%s,", Integer.valueOf(key.partition()), Integer.valueOf(it.next().intValue())));
            }
        }
        if (sb.length() > 0) {
            sb.deleteCharAt(sb.length() - 1);
        }
        return sb.toString();
    }

    private Map<TopicPartition, SortedSet<Integer>> parseThrottledReplicasConfig(String str, String str2) {
        if (str2 == null || str2.isEmpty()) {
            return Collections.emptyMap();
        }
        HashMap hashMap = new HashMap();
        for (String str3 : str2.split(",")) {
            String[] split = str3.split(":");
            if (split.length != 2) {
                throw new ValidationException("Illegal throttled replicas found for topic " + str + ": `" + str2 + "`");
            }
            ((SortedSet) hashMap.computeIfAbsent(new TopicPartition(str, Integer.valueOf(Integer.parseInt(split[0])).intValue()), topicPartition -> {
                return new TreeSet();
            })).add(Integer.valueOf(Integer.parseInt(split[1])));
        }
        return hashMap;
    }

    private static boolean needsUpdate(Map<String, Optional<String>> map, String str, String str2) {
        Optional<String> optional = map.get(str);
        return optional == null || ((Boolean) optional.map(str3 -> {
            return Boolean.valueOf(!Objects.equals(str2, str3));
        }).orElse(true)).booleanValue();
    }

    private static boolean needsRemoval(Map<String, Optional<String>> map, String str) {
        Optional<String> optional = map.get(str);
        return optional != null && ((Boolean) optional.map(str2 -> {
            return Boolean.valueOf(!str2.isEmpty());
        }).orElse(true)).booleanValue();
    }

    private static Comparator<TopicPartition> topicPartitionComparator() {
        return new Comparator<TopicPartition>() { // from class: io.confluent.kafka.databalancing.throttle.Throttler.1
            @Override // java.util.Comparator
            public int compare(TopicPartition topicPartition, TopicPartition topicPartition2) {
                if (topicPartition == null && topicPartition2 == null) {
                    return 0;
                }
                if (topicPartition2 == null) {
                    return -1;
                }
                if (topicPartition == null) {
                    return 1;
                }
                int compareTo = topicPartition.topic().compareTo(topicPartition2.topic());
                return compareTo == 0 ? Integer.compare(topicPartition.partition(), topicPartition2.partition()) : compareTo;
            }
        };
    }
}
