package org.nuxeo.lib.stream.log.internals;

import java.io.Externalizable;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Function;
import org.nuxeo.lib.stream.StreamRuntimeException;
import org.nuxeo.lib.stream.codec.Codec;
import org.nuxeo.lib.stream.codec.NoCodec;
import org.nuxeo.lib.stream.log.Latency;
import org.nuxeo.lib.stream.log.LogAppender;
import org.nuxeo.lib.stream.log.LogLag;
import org.nuxeo.lib.stream.log.LogManager;
import org.nuxeo.lib.stream.log.LogPartition;
import org.nuxeo.lib.stream.log.LogRecord;
import org.nuxeo.lib.stream.log.LogTailer;
import org.nuxeo.lib.stream.log.RebalanceListener;

/* loaded from: input_file:org/nuxeo/lib/stream/log/internals/AbstractLogManager.class */
public abstract class AbstractLogManager implements LogManager {
    protected final Map<String, CloseableLogAppender> appenders = new ConcurrentHashMap();
    protected final Map<LogPartitionGroup, LogTailer> tailersAssignments = new ConcurrentHashMap();
    protected final Set<LogTailer> tailers = Collections.newSetFromMap(new ConcurrentHashMap());

    protected abstract void create(String str, int i);

    protected abstract int getSize(String str);

    protected abstract <M extends Externalizable> CloseableLogAppender<M> createAppender(String str, Codec<M> codec);

    protected abstract <M extends Externalizable> LogTailer<M> doCreateTailer(Collection<LogPartition> collection, String str, Codec<M> codec);

    protected abstract <M extends Externalizable> LogTailer<M> doSubscribe(String str, Collection<String> collection, RebalanceListener rebalanceListener, Codec<M> codec);

    @Override // org.nuxeo.lib.stream.log.LogManager
    public synchronized boolean createIfNotExists(String str, int i) {
        if (exists(str)) {
            return false;
        }
        create(str, i);
        return true;
    }

    @Override // org.nuxeo.lib.stream.log.LogManager
    public boolean delete(String str) {
        return false;
    }

    @Override // org.nuxeo.lib.stream.log.LogManager
    public int size(String str) {
        return this.appenders.containsKey(str) ? this.appenders.get(str).size() : getSize(str);
    }

    @Override // org.nuxeo.lib.stream.log.LogManager
    public <M extends Externalizable> LogTailer<M> createTailer(String str, Collection<LogPartition> collection, Codec<M> codec) {
        Objects.requireNonNull(codec);
        collection.forEach(logPartition -> {
            checkInvalidAssignment(str, logPartition);
        });
        Codec<M> guessCodec = NoCodec.NO_CODEC.equals(codec) ? guessCodec(collection) : codec;
        collection.forEach(logPartition2 -> {
            checkInvalidCodec(logPartition2, guessCodec);
        });
        LogTailer<M> doCreateTailer = doCreateTailer(collection, str, guessCodec);
        collection.forEach(logPartition3 -> {
            this.tailersAssignments.put(new LogPartitionGroup(str, logPartition3), doCreateTailer);
        });
        this.tailers.add(doCreateTailer);
        return doCreateTailer;
    }

    protected <M extends Externalizable> Codec<M> guessCodec(Collection<LogPartition> collection) {
        for (LogPartition logPartition : collection) {
            if (this.appenders.containsKey(logPartition.name())) {
                return getAppender(logPartition.name()).getCodec();
            }
        }
        return NoCodec.NO_CODEC;
    }

    @Override // org.nuxeo.lib.stream.log.LogManager
    public boolean supportSubscribe() {
        return false;
    }

    @Override // org.nuxeo.lib.stream.log.LogManager
    public <M extends Externalizable> LogTailer<M> subscribe(String str, Collection<String> collection, RebalanceListener rebalanceListener, Codec<M> codec) {
        Objects.requireNonNull(codec);
        LogTailer<M> doSubscribe = doSubscribe(str, collection, rebalanceListener, codec);
        this.tailers.add(doSubscribe);
        return doSubscribe;
    }

    protected void checkInvalidAssignment(String str, LogPartition logPartition) {
        LogTailer logTailer = this.tailersAssignments.get(new LogPartitionGroup(str, logPartition));
        if (logTailer != null && !logTailer.closed()) {
            throw new IllegalArgumentException("Tailer for this partition already created: " + logPartition + ", group: " + str);
        }
        if (!exists(logPartition.name())) {
            throw new IllegalArgumentException("Tailer with unknown Log name: " + logPartition.name());
        }
    }

    protected void checkInvalidCodec(LogPartition logPartition, Codec codec) {
        if (this.appenders.containsKey(logPartition.name())) {
            getAppender(logPartition.name(), codec);
        }
    }

    @Override // org.nuxeo.lib.stream.log.LogManager
    public synchronized <M extends Externalizable> LogAppender<M> getAppender(String str, Codec<M> codec) {
        CloseableLogAppender computeIfAbsent = this.appenders.computeIfAbsent(str, str2 -> {
            if (exists(str2)) {
                return createAppender(str2, codec);
            }
            throw new IllegalArgumentException("Unknown Log name: " + str2);
        });
        if (NoCodec.NO_CODEC.equals(codec) || sameCodec(computeIfAbsent.getCodec(), codec)) {
            return computeIfAbsent;
        }
        throw new IllegalArgumentException(String.format("The appender for Log %s exists and expecting codec: %s, cannot use a different codec: %s", str, computeIfAbsent.getCodec(), codec));
    }

    protected boolean sameCodec(Codec codec, Codec codec2) {
        return codec == codec2 || !(NoCodec.NO_CODEC.equals(codec) || NoCodec.NO_CODEC.equals(codec2) || !codec.getClass().isInstance(codec2));
    }

    @Override // org.nuxeo.lib.stream.log.LogManager
    public <M extends Externalizable> List<Latency> getLatencyPerPartition(String str, String str2, Codec<M> codec, Function<M, Long> function, Function<M, String> function2) {
        long currentTimeMillis = System.currentTimeMillis();
        List<LogLag> lagPerPartition = getLagPerPartition(str, str2);
        ArrayList arrayList = new ArrayList(lagPerPartition.size());
        int i = 0;
        for (LogLag logLag : lagPerPartition) {
            if (logLag.upper() == 0 || logLag.lower() == 0) {
                arrayList.add(new Latency(0L, currentTimeMillis, logLag, null));
                i++;
            } else {
                LogOffsetImpl logOffsetImpl = new LogOffsetImpl(str, i, logLag.lowerOffset() - 1);
                try {
                    LogTailer<M> createTailer = createTailer("tools", logOffsetImpl.partition(), codec);
                    Throwable th = null;
                    try {
                        try {
                            createTailer.seek(logOffsetImpl);
                            LogRecord<M> read = createTailer.read(Duration.ofSeconds(1L));
                            if (read == null) {
                                throw new IllegalStateException("Unable to read " + logOffsetImpl + " lag: " + logLag);
                            }
                            arrayList.add(new Latency(function.apply(read.message()).longValue(), currentTimeMillis, logLag, function2.apply(read.message())));
                            if (createTailer != null) {
                                if (0 != 0) {
                                    try {
                                        createTailer.close();
                                    } catch (Throwable th2) {
                                        th.addSuppressed(th2);
                                    }
                                } else {
                                    createTailer.close();
                                }
                            }
                            i++;
                        } finally {
                        }
                    } finally {
                    }
                } catch (ClassCastException e) {
                    throw new IllegalStateException("Unexpected record type" + e.getMessage());
                } catch (InterruptedException e2) {
                    Thread.currentThread().interrupt();
                    throw new StreamRuntimeException(e2);
                }
            }
        }
        return arrayList;
    }

    @Override // org.nuxeo.lib.stream.log.LogManager, java.lang.AutoCloseable
    public void close() {
        this.appenders.values().stream().filter((v0) -> {
            return Objects.nonNull(v0);
        }).forEach((v0) -> {
            v0.close();
        });
        this.appenders.clear();
        this.tailers.stream().filter((v0) -> {
            return Objects.nonNull(v0);
        }).forEach((v0) -> {
            v0.close();
        });
        this.tailers.clear();
        this.tailersAssignments.clear();
    }
}
