package org.nuxeo.lib.stream.log.kafka;

import java.io.Externalizable;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
import org.nuxeo.lib.stream.codec.Codec;
import org.nuxeo.lib.stream.log.LogLag;
import org.nuxeo.lib.stream.log.LogPartition;
import org.nuxeo.lib.stream.log.LogTailer;
import org.nuxeo.lib.stream.log.RebalanceListener;
import org.nuxeo.lib.stream.log.internals.AbstractLogManager;
import org.nuxeo.lib.stream.log.internals.CloseableLogAppender;

/* loaded from: input_file:org/nuxeo/lib/stream/log/kafka/KafkaLogManager.class */
public class KafkaLogManager extends AbstractLogManager {
    public static final String DISABLE_SUBSCRIBE_PROP = "subscribe.disable";
    public static final String DEFAULT_REPLICATION_FACTOR_PROP = "default.replication.factor";
    protected final KafkaUtils kUtils;
    protected final Properties producerProperties;
    protected final Properties consumerProperties;
    protected final Properties adminProperties;
    protected final String prefix;
    protected final short defaultReplicationFactor;
    protected final boolean disableSubscribe;
    protected final KafkaNamespace ns;

    @Deprecated
    public KafkaLogManager(String str, String str2, Properties properties, Properties properties2) {
        this(str2, properties, properties2);
    }

    public KafkaLogManager(String str, Properties properties, Properties properties2) {
        this.prefix = str != null ? str : "";
        this.ns = new KafkaNamespace(this.prefix);
        this.disableSubscribe = Boolean.valueOf(properties2.getProperty(DISABLE_SUBSCRIBE_PROP, "false")).booleanValue();
        this.defaultReplicationFactor = Short.parseShort(properties.getProperty(DEFAULT_REPLICATION_FACTOR_PROP, "1"));
        this.producerProperties = normalizeProducerProperties(properties);
        this.consumerProperties = normalizeConsumerProperties(properties2);
        this.adminProperties = createAdminProperties(properties, properties2);
        this.kUtils = new KafkaUtils(this.adminProperties);
    }

    @Override // org.nuxeo.lib.stream.log.internals.AbstractLogManager
    public void create(String str, int i) {
        this.kUtils.createTopic(this.ns.getTopicName(str), i, this.defaultReplicationFactor);
    }

    @Override // org.nuxeo.lib.stream.log.internals.AbstractLogManager
    protected int getSize(String str) {
        return this.kUtils.partitions(this.ns.getTopicName(str));
    }

    @Override // org.nuxeo.lib.stream.log.LogManager
    public boolean exists(String str) {
        return this.kUtils.topicExists(this.ns.getTopicName(str));
    }

    @Override // org.nuxeo.lib.stream.log.internals.AbstractLogManager
    public <M extends Externalizable> CloseableLogAppender<M> createAppender(String str, Codec<M> codec) {
        return KafkaLogAppender.open(codec, this.ns, str, this.producerProperties, this.consumerProperties);
    }

    @Override // org.nuxeo.lib.stream.log.internals.AbstractLogManager
    protected <M extends Externalizable> LogTailer<M> doCreateTailer(Collection<LogPartition> collection, String str, Codec<M> codec) {
        collection.forEach(this::checkValidPartition);
        return KafkaLogTailer.createAndAssign(codec, this.ns, collection, str, (Properties) this.consumerProperties.clone());
    }

    protected void checkValidPartition(LogPartition logPartition) {
        int numberOfPartitions = this.kUtils.getNumberOfPartitions(this.ns.getTopicName(logPartition.name()));
        if (logPartition.partition() >= numberOfPartitions) {
            throw new IllegalArgumentException("Partition out of bound " + logPartition + " max: " + numberOfPartitions);
        }
    }

    public Properties getProducerProperties() {
        return this.producerProperties;
    }

    public Properties getConsumerProperties() {
        return this.consumerProperties;
    }

    public Properties getAdminProperties() {
        return this.adminProperties;
    }

    @Override // org.nuxeo.lib.stream.log.internals.AbstractLogManager, org.nuxeo.lib.stream.log.LogManager, java.lang.AutoCloseable
    public void close() {
        super.close();
        if (this.kUtils != null) {
            this.kUtils.close();
        }
    }

    @Override // org.nuxeo.lib.stream.log.internals.AbstractLogManager, org.nuxeo.lib.stream.log.LogManager
    public boolean supportSubscribe() {
        return !this.disableSubscribe;
    }

    @Override // org.nuxeo.lib.stream.log.internals.AbstractLogManager
    protected <M extends Externalizable> LogTailer<M> doSubscribe(String str, Collection<String> collection, RebalanceListener rebalanceListener, Codec<M> codec) {
        return KafkaLogTailer.createAndSubscribe(codec, this.ns, collection, str, (Properties) this.consumerProperties.clone(), rebalanceListener);
    }

    protected Properties normalizeProducerProperties(Properties properties) {
        Properties properties2 = properties != null ? (Properties) properties.clone() : new Properties();
        properties2.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        properties2.put("value.serializer", "org.apache.kafka.common.serialization.BytesSerializer");
        properties2.remove(DEFAULT_REPLICATION_FACTOR_PROP);
        return properties2;
    }

    protected Properties normalizeConsumerProperties(Properties properties) {
        Properties properties2 = properties != null ? (Properties) properties.clone() : new Properties();
        properties2.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        properties2.put("value.deserializer", "org.apache.kafka.common.serialization.BytesDeserializer");
        properties2.put("enable.auto.commit", false);
        properties2.put("auto.offset.reset", "earliest");
        properties2.remove(DISABLE_SUBSCRIBE_PROP);
        return properties2;
    }

    protected Properties createAdminProperties(Properties properties, Properties properties2) {
        Properties properties3 = new Properties();
        properties3.put("bootstrap.servers", properties.getOrDefault("bootstrap.servers", properties2.get("bootstrap.servers")));
        return properties3;
    }

    @Override // org.nuxeo.lib.stream.log.LogManager
    public List<LogLag> getLagPerPartition(String str, String str2) {
        Properties properties = (Properties) this.consumerProperties.clone();
        properties.put("group.id", this.prefix + str2);
        KafkaConsumer kafkaConsumer = new KafkaConsumer(properties);
        Throwable th = null;
        try {
            List<TopicPartition> list = (List) kafkaConsumer.partitionsFor(this.ns.getTopicName(str)).stream().map(partitionInfo -> {
                return new TopicPartition(partitionInfo.topic(), partitionInfo.partition());
            }).collect(Collectors.toList());
            LogLag[] logLagArr = new LogLag[list.size()];
            Map endOffsets = kafkaConsumer.endOffsets(list);
            for (TopicPartition topicPartition : list) {
                OffsetAndMetadata committed = kafkaConsumer.committed(topicPartition);
                long offset = committed != null ? committed.offset() : 0L;
                Long l = (Long) endOffsets.get(topicPartition);
                if (l == null) {
                    l = 0L;
                }
                logLagArr[topicPartition.partition()] = new LogLag(offset, l.longValue());
            }
            List<LogLag> asList = Arrays.asList(logLagArr);
            if (kafkaConsumer != null) {
                if (0 != 0) {
                    try {
                        kafkaConsumer.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                } else {
                    kafkaConsumer.close();
                }
            }
            return asList;
        } catch (Throwable th3) {
            if (kafkaConsumer != null) {
                if (0 != 0) {
                    try {
                        kafkaConsumer.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    kafkaConsumer.close();
                }
            }
            throw th3;
        }
    }

    @Override // org.nuxeo.lib.stream.log.LogManager
    public List<String> listAll() {
        Stream<String> filter = this.kUtils.listTopics().stream().filter(str -> {
            return str.startsWith(this.prefix);
        });
        KafkaNamespace kafkaNamespace = this.ns;
        kafkaNamespace.getClass();
        return (List) filter.map(kafkaNamespace::getLogName).collect(Collectors.toList());
    }

    public String toString() {
        return "KafkaLogManager{producerProperties=" + this.producerProperties + ", consumerProperties=" + this.consumerProperties + ", prefix='" + this.prefix + "'}";
    }

    @Override // org.nuxeo.lib.stream.log.LogManager
    public List<String> listConsumerGroups(String str) {
        String topicName = this.ns.getTopicName(str);
        if (!exists(str)) {
            throw new IllegalArgumentException("Unknown Log: " + str);
        }
        Stream<String> filter = this.kUtils.listConsumers(topicName).stream().filter(str2 -> {
            return str2.startsWith(this.prefix);
        });
        KafkaNamespace kafkaNamespace = this.ns;
        kafkaNamespace.getClass();
        return (List) filter.map(kafkaNamespace::getGroup).collect(Collectors.toList());
    }
}
