/*
 * Decompiled with CFR 0.152.
 */
package org.nuxeo.lib.stream.log.kafka;

import java.time.Duration;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.stream.Collectors;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.ConsumerGroupListing;
import org.apache.kafka.clients.admin.CreateTopicsResult;
import org.apache.kafka.clients.admin.DeleteTopicsResult;
import org.apache.kafka.clients.admin.DescribeTopicsResult;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.admin.TopicDescription;
import org.apache.kafka.clients.consumer.ConsumerPartitionAssignor;
import org.apache.kafka.clients.consumer.RangeAssignor;
import org.apache.kafka.clients.consumer.RoundRobinAssignor;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.KafkaFuture;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.TopicExistsException;
import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
import org.nuxeo.lib.stream.StreamRuntimeException;
import org.nuxeo.lib.stream.log.LogPartition;
import org.nuxeo.lib.stream.log.Name;

public class KafkaUtils
implements AutoCloseable {
    private static final Log log = LogFactory.getLog(KafkaUtils.class);
    public static final String BOOTSTRAP_SERVERS_PROP = "kafka.bootstrap.servers";
    public static final String DEFAULT_BOOTSTRAP_SERVERS = "localhost:9092";
    protected final AdminClient adminClient;
    protected volatile List<String> allConsumers;
    protected volatile long allConsumersTime;
    protected static final long ALL_CONSUMERS_CACHE_TIMEOUT_MS = 2000L;
    protected static final long ADMIN_CLIENT_CLOSE_TIMEOUT_S = 5L;

    public KafkaUtils() {
        this(KafkaUtils.getDefaultAdminProperties());
    }

    public KafkaUtils(Properties adminProperties) {
        this.adminClient = AdminClient.create((Properties)adminProperties);
    }

    public static Properties getDefaultAdminProperties() {
        Properties ret = new Properties();
        ret.put("bootstrap.servers", KafkaUtils.getBootstrapServers());
        ret.put("request.timeout.ms", (Object)10000);
        return ret;
    }

    public static String getBootstrapServers() {
        String bootstrapServers = System.getProperty(BOOTSTRAP_SERVERS_PROP, DEFAULT_BOOTSTRAP_SERVERS);
        if (bootstrapServers == null || bootstrapServers.isEmpty()) {
            bootstrapServers = DEFAULT_BOOTSTRAP_SERVERS;
        }
        return bootstrapServers;
    }

    public static boolean kafkaDetected() {
        AdminClient client = AdminClient.create((Properties)KafkaUtils.getDefaultAdminProperties());
        try {
            client.describeCluster().nodes().get(5L, TimeUnit.SECONDS);
            boolean bl = true;
            return bl;
        }
        catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            throw new StreamRuntimeException(e);
        }
        catch (ExecutionException e) {
            throw new StreamRuntimeException(e);
        }
        catch (TimeoutException e) {
            boolean bl = false;
            return bl;
        }
        finally {
            client.close(Duration.ofSeconds(1L));
        }
    }

    public static List<List<LogPartition>> rangeAssignments(int threads, Map<String, Integer> streams) {
        RangeAssignor assignor = new RangeAssignor();
        return KafkaUtils.assignments((ConsumerPartitionAssignor)assignor, threads, streams);
    }

    public static List<List<LogPartition>> roundRobinAssignments(int threads, Map<String, Integer> streams) {
        RoundRobinAssignor assignor = new RoundRobinAssignor();
        return KafkaUtils.assignments((ConsumerPartitionAssignor)assignor, threads, streams);
    }

    protected static List<List<LogPartition>> assignments(ConsumerPartitionAssignor assignor, int threads, Map<String, Integer> streams) {
        ArrayList parts = new ArrayList();
        streams.forEach((streamName, size) -> parts.addAll(KafkaUtils.getPartsFor(streamName, size)));
        HashMap<String, ConsumerPartitionAssignor.Subscription> subscriptions = new HashMap<String, ConsumerPartitionAssignor.Subscription>();
        List streamNames = streams.keySet().stream().sorted().collect(Collectors.toList());
        for (int i = 0; i < threads; ++i) {
            subscriptions.put(String.valueOf(i), new ConsumerPartitionAssignor.Subscription(streamNames));
        }
        Cluster cluster = new Cluster("kafka-cluster", Collections.emptyList(), parts, Collections.emptySet(), Collections.emptySet());
        Map assignments = assignor.assign(cluster, new ConsumerPartitionAssignor.GroupSubscription(subscriptions)).groupAssignment();
        ArrayList<List<LogPartition>> ret = new ArrayList<List<LogPartition>>(threads);
        for (int i = 0; i < threads; ++i) {
            ret.add(((ConsumerPartitionAssignor.Assignment)assignments.get(String.valueOf(i))).partitions().stream().map(part -> new LogPartition(Name.ofUrn(part.topic()), part.partition())).collect(Collectors.toList()));
        }
        return ret;
    }

    protected static Collection<PartitionInfo> getPartsFor(String topic, int partitions) {
        ArrayList<PartitionInfo> ret = new ArrayList<PartitionInfo>();
        for (int i = 0; i < partitions; ++i) {
            ret.add(new PartitionInfo(topic, i, null, null, null));
        }
        return ret;
    }

    public void createTopicWithoutReplication(String topic, int partitions) {
        this.createTopic(topic, partitions, (short)1);
    }

    public void createTopic(String topic, int partitions, short replicationFactor) {
        log.info((Object)("Creating topic: " + topic + ", partitions: " + partitions + ", replications: " + replicationFactor));
        CreateTopicsResult ret = this.adminClient.createTopics(Collections.singletonList(new NewTopic(topic, partitions, replicationFactor)));
        try {
            ret.all().get(5L, TimeUnit.MINUTES);
        }
        catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            throw new StreamRuntimeException(e);
        }
        catch (ExecutionException e) {
            if (e.getCause() instanceof TopicExistsException) {
                log.info((Object)("topic: " + topic + " exists"));
            }
            throw new StreamRuntimeException(e);
        }
        catch (TimeoutException e) {
            throw new StreamRuntimeException("Unable to create topic " + topic + " within the timeout", e);
        }
        if (this.partitions(topic) != partitions) {
            this.waitForTopicCreation(topic, Duration.ofMinutes(2L));
        }
    }

    protected void waitForTopicCreation(String topic, Duration timeout) {
        log.warn((Object)("Waiting for brokers to become aware that the topic " + topic + " has been created."));
        long deadline = System.currentTimeMillis() + timeout.toMillis();
        do {
            if (System.currentTimeMillis() > deadline) {
                throw new StreamRuntimeException(new TimeoutException("Timeout while waiting for topic " + topic + " metadata propagation in the cluster"));
            }
            try {
                Thread.sleep(5000L);
            }
            catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                throw new StreamRuntimeException("Interrupted while waiting for topic creation " + topic, e);
            }
        } while (!this.topicExists(topic));
        log.debug((Object)"Topic is now available");
    }

    public boolean topicExists(String topic) {
        return this.partitions(topic) > 0;
    }

    public int partitions(String topic) {
        try {
            TopicDescription desc = (TopicDescription)((KafkaFuture)this.adminClient.describeTopics(Collections.singletonList(topic)).values().get(topic)).get();
            if (log.isDebugEnabled()) {
                log.debug((Object)String.format("Topic %s exists: %s", topic, desc));
            }
            return desc.partitions().size();
        }
        catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            throw new StreamRuntimeException(e);
        }
        catch (ExecutionException e) {
            if (e.getCause() instanceof UnknownTopicOrPartitionException) {
                return -1;
            }
            throw new StreamRuntimeException(e);
        }
    }

    public Set<String> listTopics() {
        try {
            return (Set)this.adminClient.listTopics().names().get();
        }
        catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            throw new StreamRuntimeException(e);
        }
        catch (ExecutionException e) {
            throw new StreamRuntimeException(e);
        }
    }

    public List<String> listConsumers(String topic) {
        return this.listAllConsumers().stream().filter(consumer -> this.getConsumerTopics((String)consumer).contains(topic)).collect(Collectors.toList());
    }

    protected List<String> getConsumerTopics(String group) {
        try {
            return ((Map)this.adminClient.listConsumerGroupOffsets(group).partitionsToOffsetAndMetadata().get()).keySet().stream().map(TopicPartition::topic).collect(Collectors.toList());
        }
        catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            throw new StreamRuntimeException(e);
        }
        catch (ExecutionException e) {
            throw new StreamRuntimeException(e);
        }
    }

    public synchronized List<String> listAllConsumers() {
        long now = System.currentTimeMillis();
        if (this.allConsumers == null || now - this.allConsumersTime > 2000L) {
            try {
                this.allConsumers = ((Collection)this.adminClient.listConsumerGroups().all().get()).stream().map(ConsumerGroupListing::groupId).collect(Collectors.toList());
            }
            catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                throw new StreamRuntimeException(e);
            }
            catch (ExecutionException e) {
                throw new StreamRuntimeException(e);
            }
            if (!this.allConsumers.isEmpty()) {
                this.allConsumersTime = now;
            }
        }
        return this.allConsumers;
    }

    public int getNumberOfPartitions(String topic) {
        DescribeTopicsResult descriptions = this.adminClient.describeTopics(Collections.singletonList(topic));
        try {
            return ((TopicDescription)((KafkaFuture)descriptions.values().get(topic)).get()).partitions().size();
        }
        catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            throw new StreamRuntimeException(e);
        }
        catch (ExecutionException e) {
            throw new StreamRuntimeException(e);
        }
    }

    @Override
    public void close() {
        this.adminClient.close(Duration.ofSeconds(5L));
        log.debug((Object)"Closed.");
    }

    public boolean delete(String topic) {
        log.info((Object)("Deleting topic: " + topic));
        DeleteTopicsResult result = this.adminClient.deleteTopics(Collections.singleton(topic));
        return ((KafkaFuture)result.values().get(topic)).isDone();
    }
}

