/*
 * Decompiled with CFR 0.152.
 */
package org.nuxeo.lib.stream.log;

import java.io.Externalizable;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.nuxeo.lib.stream.codec.Codec;
import org.nuxeo.lib.stream.log.Latency;
import org.nuxeo.lib.stream.log.LogAppender;
import org.nuxeo.lib.stream.log.LogConfig;
import org.nuxeo.lib.stream.log.LogLag;
import org.nuxeo.lib.stream.log.LogManager;
import org.nuxeo.lib.stream.log.LogPartition;
import org.nuxeo.lib.stream.log.LogTailer;
import org.nuxeo.lib.stream.log.Name;
import org.nuxeo.lib.stream.log.RebalanceListener;
import org.nuxeo.lib.stream.log.chronicle.ChronicleLogConfig;
import org.nuxeo.lib.stream.log.chronicle.ChronicleLogManager;
import org.nuxeo.lib.stream.log.kafka.KafkaLogConfig;
import org.nuxeo.lib.stream.log.kafka.KafkaLogManager;

public class UnifiedLogManager
implements LogManager {
    protected final List<LogConfig> configs;
    protected LogManager cqManager;
    protected LogManager kafkaManager;
    protected LogManager defaultManager;
    protected LogConfig defaultConfig;
    protected Map<LogConfig, LogManager> managers = new HashMap<LogConfig, LogManager>();

    public UnifiedLogManager(List<LogConfig> configs) {
        if (configs == null || configs.isEmpty()) {
            throw new IllegalArgumentException("No LogConfig provided");
        }
        this.configs = configs;
        this.createCQLogManager();
        this.createKafkaLogManager();
        this.findDefaultLogManger();
    }

    protected void createCQLogManager() {
        List<ChronicleLogConfig> cqConfigs = this.configs.stream().filter(config -> config instanceof ChronicleLogConfig).map(config -> (ChronicleLogConfig)config).collect(Collectors.toList());
        if (!cqConfigs.isEmpty()) {
            this.cqManager = new ChronicleLogManager(cqConfigs);
            cqConfigs.forEach(config -> this.managers.put((LogConfig)config, this.cqManager));
        }
    }

    protected void createKafkaLogManager() {
        List<KafkaLogConfig> kafkaConfigs = this.configs.stream().filter(config -> config instanceof KafkaLogConfig).map(config -> (KafkaLogConfig)config).collect(Collectors.toList());
        if (!kafkaConfigs.isEmpty()) {
            this.kafkaManager = new KafkaLogManager(kafkaConfigs);
            kafkaConfigs.forEach(config -> this.managers.put((LogConfig)config, this.kafkaManager));
        }
    }

    protected void findDefaultLogManger() {
        List defaultConfigs = this.configs.stream().filter(LogConfig::isDefault).collect(Collectors.toList());
        this.defaultConfig = defaultConfigs.isEmpty() ? this.configs.get(this.configs.size() - 1) : (LogConfig)defaultConfigs.get(defaultConfigs.size() - 1);
        this.defaultManager = this.defaultConfig instanceof ChronicleLogConfig ? this.cqManager : this.kafkaManager;
    }

    protected LogManager getManager(Name name) {
        return this.managers.get(this.configs.stream().filter(config -> config.match(name)).findFirst().orElse(this.defaultConfig));
    }

    protected LogManager getManager(Name name, Name group) {
        return this.managers.get(this.configs.stream().filter(config -> config.match(name, group)).findFirst().orElse(this.defaultConfig));
    }

    @Override
    public boolean exists(Name name) {
        return this.getManager(name).exists(name);
    }

    @Override
    public boolean createIfNotExists(Name name, int size) {
        return this.getManager(name).createIfNotExists(name, size);
    }

    @Override
    public boolean delete(Name name) {
        return this.getManager(name).delete(name);
    }

    @Override
    public int size(Name name) {
        return this.getManager(name).size(name);
    }

    @Override
    public <M extends Externalizable> LogAppender<M> getAppender(Name name, Codec<M> codec) {
        return this.getManager(name).getAppender(name, codec);
    }

    @Override
    public <M extends Externalizable> LogTailer<M> createTailer(Name group, Collection<LogPartition> partitions, Codec<M> codec) {
        if (partitions.isEmpty()) {
            return this.defaultManager.createTailer(group, partitions, codec);
        }
        Name name = partitions.iterator().next().name();
        return this.getManager(name, group).createTailer(group, partitions, codec);
    }

    @Override
    public boolean supportSubscribe() {
        return this.defaultManager.supportSubscribe();
    }

    @Override
    public <M extends Externalizable> LogTailer<M> subscribe(Name group, Collection<Name> names, RebalanceListener listener, Codec<M> codec) {
        Name name = names.iterator().next();
        return this.getManager(name, group).subscribe(group, names, listener, codec);
    }

    @Override
    public List<LogLag> getLagPerPartition(Name name, Name group) {
        return this.getManager(name, group).getLagPerPartition(name, group);
    }

    @Override
    public <M extends Externalizable> List<Latency> getLatencyPerPartition(Name name, Name group, Codec<M> codec, Function<M, Long> timestampExtractor, Function<M, String> keyExtractor) {
        return this.getManager(name, group).getLatencyPerPartition(name, group, codec, timestampExtractor, keyExtractor);
    }

    @Override
    public List<Name> listAllNames() {
        ArrayList<Name> names = new ArrayList<Name>();
        if (this.kafkaManager != null) {
            names.addAll(this.kafkaManager.listAllNames());
        }
        if (this.cqManager != null) {
            names.addAll(this.cqManager.listAllNames());
        }
        return names;
    }

    @Override
    public List<Name> listConsumerGroups(Name name) {
        return this.getManager(name).listConsumerGroups(name);
    }

    @Override
    public void close() {
        if (this.kafkaManager != null) {
            this.kafkaManager.close();
        }
        if (this.cqManager != null) {
            this.cqManager.close();
        }
    }
}

