/*
 * Decompiled with CFR 0.152.
 */
package io.confluent.telemetry.events.exporter.kafka;

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Random;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ThreadLocalRandom;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.kafka.clients.producer.Partitioner;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.PartitionInfo;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class RandomBrokerPartitionSubsetPartitioner
implements Partitioner {
    public static final String SUBSET_PARTITIONER_PARTITION_PERCENTAGE_CONFIG = "subset.partitioner.partition.percentage";
    public static final double DEFAULT_SUBSET_PARTITIONER_PARTITION_PERCENTAGE = 0.1875;
    private static final Logger log = LoggerFactory.getLogger(RandomBrokerPartitionSubsetPartitioner.class);
    private double partitionPercentage;
    private final ConcurrentMap<String, Map<Integer, Integer>> pastPartitionsToPreferredLeaders = new ConcurrentHashMap<String, Map<Integer, Integer>>();
    private final ConcurrentMap<String, List<Integer>> partitionsToProduceTo = new ConcurrentHashMap<String, List<Integer>>();

    public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
        List validPartitions = (List)this.partitionsToProduceTo.get(topic);
        if (validPartitions == null) {
            log.info("Partitioner has null list of partitions to produce to. Calculating partitions to produce to");
            this.onNewBatch(topic, cluster, -1);
            validPartitions = (List)this.partitionsToProduceTo.get(topic);
        }
        return (Integer)validPartitions.get(this.getRandom().nextInt(validPartitions.size()));
    }

    public synchronized void onNewBatch(String topic, Cluster cluster, int prevPartition) {
        Map<Integer, Integer> currentPartitionsToPreferredLeadersForTopic = this.currentPartitionsToPreferredLeaders(cluster, topic);
        if (this.topicTopologyHasChanged(topic, currentPartitionsToPreferredLeadersForTopic)) {
            this.setPartitionsToWriteTo(topic, this.selectPartitions(topic, cluster));
            this.pastPartitionsToPreferredLeaders.put(topic, currentPartitionsToPreferredLeadersForTopic);
            log.info("Kafka Producer producing to the following subset partitions: {}", this.partitionsToProduceTo);
        }
    }

    public void configure(Map<String, ?> configs) {
        this.partitionPercentage = 0.1875;
        Object localPartitionPercentage = configs.get(SUBSET_PARTITIONER_PARTITION_PERCENTAGE_CONFIG);
        if (localPartitionPercentage != null) {
            try {
                this.partitionPercentage = Double.parseDouble(localPartitionPercentage.toString()) / 100.0;
            }
            catch (NumberFormatException e) {
                log.warn("Exception when trying to parse partition percentage config: %s. Using default value of 0.1875", (Throwable)e);
            }
        }
        if (this.partitionPercentage > 1.0) {
            log.warn(String.format("Configured partition percentage is above 100%%: %f. Using default value of 0.1875", this.partitionPercentage));
            this.partitionPercentage = 0.1875;
        }
    }

    private boolean topicTopologyHasChanged(String topic, Map<Integer, Integer> currentPartitionsToPreferredLeaders) {
        return !Objects.equals(this.pastPartitionsToPreferredLeaders.get(topic), currentPartitionsToPreferredLeaders);
    }

    private static Stream<Integer> selectRandomPartitions(List<Integer> partitions, int numPartitions) {
        Collections.shuffle(partitions);
        return partitions.stream().limit(numPartitions);
    }

    private List<Integer> selectPartitions(String topic, Cluster cluster) {
        List allPartitions = cluster.partitionsForTopic(topic);
        if (allPartitions.isEmpty()) {
            return new ArrayList<Integer>(Collections.singletonList(0));
        }
        int numPartitionsToSelect = (int)Math.max(Math.round((double)allPartitions.size() * this.partitionPercentage), 1L);
        List allAvailablePartitions = cluster.availablePartitionsForTopic(topic);
        if (allAvailablePartitions.size() < numPartitionsToSelect) {
            return RandomBrokerPartitionSubsetPartitioner.selectRandomPartitions(allPartitions.stream().map(PartitionInfo::partition).collect(Collectors.toCollection(ArrayList::new)), numPartitionsToSelect).collect(Collectors.toList());
        }
        ArrayList<Integer> partitionsList = new ArrayList<Integer>(numPartitionsToSelect);
        ArrayList shuffledBrokers = new ArrayList(cluster.nodes());
        Collections.shuffle(shuffledBrokers);
        for (Node shuffledBroker : shuffledBrokers) {
            List partitionsToAdd = allAvailablePartitions.stream().filter(p -> p.leader().id() == shuffledBroker.id()).map(PartitionInfo::partition).collect(Collectors.toCollection(ArrayList::new));
            if (partitionsList.size() + partitionsToAdd.size() > numPartitionsToSelect) {
                int remaining = numPartitionsToSelect - partitionsList.size();
                partitionsList.addAll(RandomBrokerPartitionSubsetPartitioner.selectRandomPartitions(partitionsToAdd, remaining).collect(Collectors.toList()));
                return partitionsList;
            }
            partitionsList.addAll(partitionsToAdd);
            if (partitionsList.size() != numPartitionsToSelect) continue;
            break;
        }
        return partitionsList;
    }

    private void setPartitionsToWriteTo(String topic, List<Integer> partitionsList) {
        this.partitionsToProduceTo.put(topic, partitionsList);
    }

    public void close() {
    }

    private Map<Integer, Integer> currentPartitionsToPreferredLeaders(Cluster cluster, String topic) {
        return cluster.availablePartitionsForTopic(topic).stream().collect(Collectors.toMap(PartitionInfo::partition, partition -> {
            Node[] replicas = partition.replicas();
            return replicas.length == 0 ? -1 : replicas[0].id();
        }));
    }

    private Random getRandom() {
        return ThreadLocalRandom.current();
    }
}

