package org.nuxeo.lib.stream.log.kafka;

import java.io.Externalizable;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Properties;
import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
import org.nuxeo.lib.stream.codec.Codec;
import org.nuxeo.lib.stream.log.LogLag;
import org.nuxeo.lib.stream.log.LogPartition;
import org.nuxeo.lib.stream.log.LogTailer;
import org.nuxeo.lib.stream.log.Name;
import org.nuxeo.lib.stream.log.NameResolver;
import org.nuxeo.lib.stream.log.RebalanceListener;
import org.nuxeo.lib.stream.log.internals.AbstractLogManager;
import org.nuxeo.lib.stream.log.internals.CloseableLogAppender;

/* loaded from: input_file:org/nuxeo/lib/stream/log/kafka/KafkaLogManager.class */
public class KafkaLogManager extends AbstractLogManager {
    public static final String DISABLE_SUBSCRIBE_PROP = "subscribe.disable";
    protected final List<KafkaLogConfig> configs;
    protected final KafkaLogConfig defaultConfig;
    protected final Map<KafkaLogConfig, KafkaUtils> kUtils;

    public KafkaLogManager(String str, Properties properties, Properties properties2) {
        this(Collections.singletonList(new KafkaLogConfig("unknown", true, Collections.emptyList(), str, null, properties, properties2)));
    }

    public KafkaLogManager(List<KafkaLogConfig> list) {
        this.kUtils = new HashMap();
        if (list == null && list.isEmpty()) {
            throw new IllegalArgumentException("config required");
        }
        this.configs = list;
        this.defaultConfig = findDefaultConfig();
        this.configs.forEach(kafkaLogConfig -> {
            this.kUtils.put(kafkaLogConfig, new KafkaUtils(kafkaLogConfig.getAdminProperties()));
        });
    }

    protected KafkaLogConfig findDefaultConfig() {
        List list = (List) this.configs.stream().filter((v0) -> {
            return v0.isDefault();
        }).collect(Collectors.toList());
        return list.isEmpty() ? this.configs.get(this.configs.size() - 1) : (KafkaLogConfig) list.get(list.size() - 1);
    }

    protected KafkaLogConfig getConfig(Name name) {
        return this.configs.stream().filter(kafkaLogConfig -> {
            return kafkaLogConfig.match(name);
        }).findFirst().orElse(this.defaultConfig);
    }

    protected KafkaLogConfig getConfig(Name name, Name name2) {
        return this.configs.stream().filter(kafkaLogConfig -> {
            return kafkaLogConfig.match(name, name2);
        }).findFirst().orElse(this.defaultConfig);
    }

    @Override // org.nuxeo.lib.stream.log.internals.AbstractLogManager
    public void create(Name name, int i) {
        KafkaLogConfig config = getConfig(name);
        this.kUtils.get(config).createTopic(config.getResolver().getId(name), i, config.getReplicatorFactor());
    }

    @Override // org.nuxeo.lib.stream.log.internals.AbstractLogManager
    protected int getSize(Name name) {
        KafkaLogConfig config = getConfig(name);
        return this.kUtils.get(config).partitions(config.getResolver().getId(name));
    }

    @Override // org.nuxeo.lib.stream.log.LogManager
    public boolean exists(Name name) {
        KafkaLogConfig config = getConfig(name);
        return this.kUtils.get(config).topicExists(config.getResolver().getId(name));
    }

    @Override // org.nuxeo.lib.stream.log.internals.AbstractLogManager
    public <M extends Externalizable> CloseableLogAppender<M> createAppender(Name name, Codec<M> codec) {
        KafkaLogConfig config = getConfig(name);
        return KafkaLogAppender.open(codec, config.getResolver(), name, config.getProducerProperties(), config.getConsumerProperties());
    }

    @Override // org.nuxeo.lib.stream.log.internals.AbstractLogManager
    protected <M extends Externalizable> LogTailer<M> doCreateTailer(Collection<LogPartition> collection, Name name, Codec<M> codec) {
        collection.forEach(this::checkValidPartition);
        if (collection.isEmpty()) {
            return KafkaLogTailer.createAndAssign(codec, this.defaultConfig.getResolver(), collection, name, (Properties) this.defaultConfig.getConsumerProperties().clone());
        }
        KafkaLogConfig config = getConfig(collection.iterator().next().name());
        return KafkaLogTailer.createAndAssign(codec, config.getResolver(), collection, name, (Properties) config.getConsumerProperties().clone());
    }

    protected void checkValidPartition(LogPartition logPartition) {
        KafkaLogConfig config = getConfig(logPartition.name());
        int numberOfPartitions = this.kUtils.get(config).getNumberOfPartitions(config.getResolver().getId(logPartition.name()));
        if (logPartition.partition() >= numberOfPartitions) {
            throw new IllegalArgumentException("Partition out of bound " + logPartition + " max: " + numberOfPartitions);
        }
    }

    @Override // org.nuxeo.lib.stream.log.internals.AbstractLogManager, org.nuxeo.lib.stream.log.LogManager, java.lang.AutoCloseable
    public void close() {
        super.close();
        this.configs.forEach(kafkaLogConfig -> {
            this.kUtils.get(kafkaLogConfig).close();
        });
    }

    @Override // org.nuxeo.lib.stream.log.internals.AbstractLogManager, org.nuxeo.lib.stream.log.LogManager
    public boolean supportSubscribe() {
        return !this.defaultConfig.getDisableSubscribe().booleanValue();
    }

    @Override // org.nuxeo.lib.stream.log.internals.AbstractLogManager
    protected <M extends Externalizable> LogTailer<M> doSubscribe(Name name, Collection<Name> collection, RebalanceListener rebalanceListener, Codec<M> codec) {
        KafkaLogConfig config = getConfig(collection.iterator().next(), name);
        return KafkaLogTailer.createAndSubscribe(codec, config.getResolver(), collection, name, (Properties) config.getConsumerProperties().clone(), rebalanceListener);
    }

    @Override // org.nuxeo.lib.stream.log.internals.AbstractLogManager, org.nuxeo.lib.stream.log.LogManager
    public List<LogLag> getLagPerPartition(Name name, Name name2) {
        List<LogLag> asList;
        KafkaLogConfig config = getConfig(name, name2);
        Properties properties = (Properties) config.getConsumerProperties().clone();
        properties.put("group.id", config.getResolver().getId(name2));
        properties.put("client.id", config.getResolver().getId(name2) + "-lag");
        synchronized (KafkaLogManager.class) {
            KafkaConsumer kafkaConsumer = new KafkaConsumer(properties);
            try {
                Set<TopicPartition> set = (Set) kafkaConsumer.partitionsFor(config.getResolver().getId(name)).stream().map(partitionInfo -> {
                    return new TopicPartition(partitionInfo.topic(), partitionInfo.partition());
                }).collect(Collectors.toSet());
                LogLag[] logLagArr = new LogLag[set.size()];
                Map endOffsets = kafkaConsumer.endOffsets(set);
                Map committed = kafkaConsumer.committed(set);
                for (TopicPartition topicPartition : set) {
                    OffsetAndMetadata offsetAndMetadata = (OffsetAndMetadata) committed.get(topicPartition);
                    long offset = offsetAndMetadata != null ? offsetAndMetadata.offset() : 0L;
                    Long l = (Long) endOffsets.get(topicPartition);
                    if (l == null) {
                        l = 0L;
                    }
                    logLagArr[topicPartition.partition()] = new LogLag(offset, l.longValue());
                }
                asList = Arrays.asList(logLagArr);
                kafkaConsumer.close();
            } finally {
            }
        }
        return asList;
    }

    @Override // org.nuxeo.lib.stream.log.LogManager
    public List<Name> listAllNames() {
        Set<String> listTopics = this.kUtils.get(this.defaultConfig).listTopics();
        HashSet hashSet = new HashSet(listTopics.size());
        for (String str : listTopics) {
            for (KafkaLogConfig kafkaLogConfig : this.configs) {
                if (str.startsWith(kafkaLogConfig.getResolver().getPrefix())) {
                    hashSet.add(kafkaLogConfig.getResolver().getName(str));
                }
            }
        }
        return new ArrayList(hashSet);
    }

    public String toString() {
        return "KafkaLogManager{configs=" + this.configs + ", defaultConfig=" + this.defaultConfig + ", defaultResolver" + this.defaultConfig.getResolver() + "}";
    }

    protected String filterDisplayedProperties(Properties properties) {
        String properties2 = properties.toString();
        return properties2.indexOf("password") < 0 ? properties2 : properties2.replaceAll("password=.[^\\\"\\;\\,\\ ]*", "password=****");
    }

    @Override // org.nuxeo.lib.stream.log.LogManager
    public List<Name> listConsumerGroups(Name name) {
        KafkaLogConfig config = getConfig(name);
        String id = config.getResolver().getId(name);
        if (!this.kUtils.get(config).topicExists(id)) {
            throw new IllegalArgumentException("Unknown Log: " + name);
        }
        Stream<String> filter = this.kUtils.get(config).listConsumers(id).stream().filter(str -> {
            return str.startsWith(config.getResolver().getPrefix());
        });
        NameResolver resolver = config.getResolver();
        Objects.requireNonNull(resolver);
        return (List) filter.map(resolver::getName).collect(Collectors.toList());
    }

    @Override // org.nuxeo.lib.stream.log.internals.AbstractLogManager, org.nuxeo.lib.stream.log.LogManager
    public boolean delete(Name name) {
        KafkaLogConfig config = getConfig(name);
        return this.kUtils.get(config).delete(config.getResolver().getId(name));
    }
}
