/*
 * Decompiled with CFR 0.152.
 */
package org.nuxeo.lib.stream.log.internals;

import java.io.Externalizable;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Function;
import org.nuxeo.lib.stream.StreamRuntimeException;
import org.nuxeo.lib.stream.codec.Codec;
import org.nuxeo.lib.stream.codec.NoCodec;
import org.nuxeo.lib.stream.log.Latency;
import org.nuxeo.lib.stream.log.LogAppender;
import org.nuxeo.lib.stream.log.LogLag;
import org.nuxeo.lib.stream.log.LogManager;
import org.nuxeo.lib.stream.log.LogPartition;
import org.nuxeo.lib.stream.log.LogRecord;
import org.nuxeo.lib.stream.log.LogTailer;
import org.nuxeo.lib.stream.log.Name;
import org.nuxeo.lib.stream.log.RebalanceListener;
import org.nuxeo.lib.stream.log.internals.CloseableLogAppender;
import org.nuxeo.lib.stream.log.internals.LogOffsetImpl;
import org.nuxeo.lib.stream.log.internals.LogPartitionGroup;

public abstract class AbstractLogManager
implements LogManager {
    protected static final Name ADMIN_GROUP = Name.of("admin", "tailer");
    protected final Map<Name, CloseableLogAppender> appenders = new ConcurrentHashMap<Name, CloseableLogAppender>();
    protected final Map<LogPartitionGroup, LogTailer> tailersAssignments = new ConcurrentHashMap<LogPartitionGroup, LogTailer>();
    protected final Set<LogTailer> tailers = Collections.newSetFromMap(new ConcurrentHashMap());

    protected abstract void create(Name var1, int var2);

    protected abstract int getSize(Name var1);

    protected abstract <M extends Externalizable> CloseableLogAppender<M> createAppender(Name var1, Codec<M> var2);

    protected abstract <M extends Externalizable> LogTailer<M> doCreateTailer(Collection<LogPartition> var1, Name var2, Codec<M> var3);

    protected abstract <M extends Externalizable> LogTailer<M> doSubscribe(Name var1, Collection<Name> var2, RebalanceListener var3, Codec<M> var4);

    @Override
    public abstract List<LogLag> getLagPerPartition(Name var1, Name var2);

    @Override
    public synchronized boolean createIfNotExists(Name name, int size) {
        if (!this.exists(name)) {
            this.create(name, size);
            return true;
        }
        return false;
    }

    @Override
    public boolean delete(Name name) {
        return false;
    }

    @Override
    public int size(Name name) {
        if (this.appenders.containsKey(name)) {
            return this.appenders.get(name).size();
        }
        return this.getSize(name);
    }

    @Override
    public <M extends Externalizable> LogTailer<M> createTailer(Name group, Collection<LogPartition> partitions, Codec<M> codec) {
        Objects.requireNonNull(codec);
        partitions.forEach(partition -> this.checkInvalidAssignment(group, (LogPartition)partition));
        Codec tailerCodec = NoCodec.NO_CODEC.equals(codec) ? this.guessCodec(partitions) : codec;
        partitions.forEach(partition -> this.checkInvalidCodec((LogPartition)partition, tailerCodec));
        LogTailer ret = this.doCreateTailer(partitions, group, tailerCodec);
        this.cleanTailers();
        partitions.forEach(partition -> this.tailersAssignments.put(new LogPartitionGroup(group, (LogPartition)partition), ret));
        this.tailers.add(ret);
        return ret;
    }

    protected <M extends Externalizable> Codec<M> guessCodec(Collection<LogPartition> partitions) {
        for (LogPartition partition : partitions) {
            if (!this.appenders.containsKey(partition.name())) continue;
            return this.getAppender(partition.name()).getCodec();
        }
        return NoCodec.NO_CODEC;
    }

    @Override
    public boolean supportSubscribe() {
        return false;
    }

    @Override
    public <M extends Externalizable> LogTailer<M> subscribe(Name group, Collection<Name> names, RebalanceListener listener, Codec<M> codec) {
        Objects.requireNonNull(codec);
        LogTailer<M> ret = this.doSubscribe(group, names, listener, codec);
        this.cleanTailers();
        this.tailers.add(ret);
        return ret;
    }

    protected void checkInvalidAssignment(Name group, LogPartition partition) {
        LogPartitionGroup key = new LogPartitionGroup(group, partition);
        LogTailer ret = this.tailersAssignments.get(key);
        if (ret != null && !ret.closed()) {
            throw new IllegalArgumentException("Tailer for this partition already created: " + partition + ", group: " + group);
        }
        if (!this.exists(partition.name())) {
            throw new IllegalArgumentException("Tailer with unknown Log name: " + partition.name());
        }
    }

    protected <M extends Externalizable> void checkInvalidCodec(LogPartition partition, Codec<M> codec) {
        if (this.appenders.containsKey(partition.name())) {
            this.getAppender(partition.name(), codec);
        }
    }

    @Override
    public <M extends Externalizable> LogAppender<M> getAppender(Name name, Codec<M> codec) {
        LogAppender ret = this.appenders.computeIfAbsent(name, n -> {
            if (this.exists((Name)n)) {
                return this.createAppender((Name)n, codec);
            }
            throw new IllegalArgumentException("Unknown Log name: " + n);
        });
        if (NoCodec.NO_CODEC.equals(codec) || this.sameCodec(ret.getCodec(), codec)) {
            return ret;
        }
        throw new IllegalArgumentException(String.format("The appender for Log %s exists and expecting codec: %s, cannot use a different codec: %s", name, ret.getCodec(), codec));
    }

    protected <M extends Externalizable> boolean sameCodec(Codec<M> codec1, Codec<M> codec2) {
        return codec1 == codec2 || !NoCodec.NO_CODEC.equals(codec1) && !NoCodec.NO_CODEC.equals(codec2) && codec1.getClass().isInstance(codec2);
    }

    @Override
    public <M extends Externalizable> List<Latency> getLatencyPerPartition(Name name, Name group, Codec<M> codec, Function<M, Long> timestampExtractor, Function<M, String> keyExtractor) {
        long now = System.currentTimeMillis();
        List<LogLag> lags = this.getLagPerPartition(name, group);
        ArrayList<Latency> ret = new ArrayList<Latency>(lags.size());
        int partition = 0;
        for (LogLag lag : lags) {
            if (lag.upper() == 0L || lag.lower() == 0L || lag.lag() == 0L) {
                ret.add(new Latency(0L, now, lag, null));
                ++partition;
                continue;
            }
            LogOffsetImpl offset = new LogOffsetImpl(name, partition, lag.lowerOffset() - 1L);
            try (LogTailer<M> tailer = this.createTailer(ADMIN_GROUP, offset.partition(), codec);){
                tailer.seek(offset);
                LogRecord<M> record = tailer.read(Duration.ofSeconds(1L));
                if (record == null) {
                    ret.add(new Latency(0L, now, lag, null));
                } else {
                    long timestamp = timestampExtractor.apply(record.message());
                    String key = keyExtractor.apply(record.message());
                    ret.add(new Latency(timestamp, now, lag, key));
                }
            }
            catch (ClassCastException e) {
                throw new IllegalStateException("Unexpected record type" + e.getMessage());
            }
            catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                throw new StreamRuntimeException(e);
            }
            ++partition;
        }
        return ret;
    }

    protected void cleanTailers() {
        this.tailers.removeIf(LogTailer::closed);
        this.tailersAssignments.values().removeIf(LogTailer::closed);
    }

    @Override
    public void close() {
        this.appenders.values().stream().filter(Objects::nonNull).forEach(CloseableLogAppender::close);
        this.appenders.clear();
        this.tailers.stream().filter(Objects::nonNull).forEach(LogTailer::close);
        this.tailers.clear();
        this.tailersAssignments.clear();
    }
}

