package org.nuxeo.lib.stream.log.chronicle;

import java.io.Externalizable;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import org.nuxeo.lib.stream.codec.Codec;
import org.nuxeo.lib.stream.log.LogOffset;
import org.nuxeo.lib.stream.log.LogPartition;
import org.nuxeo.lib.stream.log.LogRecord;
import org.nuxeo.lib.stream.log.LogTailer;
import org.nuxeo.lib.stream.log.Name;

/* loaded from: input_file:org/nuxeo/lib/stream/log/chronicle/ChronicleCompoundLogTailer.class */
public class ChronicleCompoundLogTailer<M extends Externalizable> implements LogTailer<M> {
    protected final Name group;
    protected final int size;
    protected final Codec<M> codec;
    protected boolean closed;
    protected long counter;
    protected final List<ChronicleLogTailer<M>> tailers = new ArrayList();
    protected final List<LogPartition> logPartitions = new ArrayList();

    public ChronicleCompoundLogTailer(Collection<ChronicleLogTailer<M>> collection, Name name) {
        this.tailers.addAll(collection);
        this.group = name;
        this.size = collection.size();
        if (collection.isEmpty()) {
            this.codec = null;
        } else {
            this.codec = collection.iterator().next().getCodec();
        }
        collection.forEach(chronicleLogTailer -> {
            this.logPartitions.addAll(chronicleLogTailer.assignments());
        });
    }

    @Override // org.nuxeo.lib.stream.log.LogTailer
    public LogRecord<M> read(Duration duration) throws InterruptedException {
        LogRecord<M> read = read();
        if (read != null) {
            return read;
        }
        long millis = duration.toMillis();
        long currentTimeMillis = System.currentTimeMillis() + millis;
        long min = Math.min(100L, millis);
        while (read == null && System.currentTimeMillis() < currentTimeMillis) {
            Thread.sleep(min);
            read = read();
        }
        return read;
    }

    protected LogRecord<M> read() {
        if (this.size <= 0) {
            return null;
        }
        long j = this.counter + this.size;
        do {
            this.counter++;
            LogRecord<M> read = this.tailers.get(((int) this.counter) % this.size).read();
            if (read != null) {
                return read;
            }
        } while (this.counter < j);
        return null;
    }

    @Override // org.nuxeo.lib.stream.log.LogTailer
    public LogOffset commit(LogPartition logPartition) {
        for (ChronicleLogTailer<M> chronicleLogTailer : this.tailers) {
            if (chronicleLogTailer.assignments().contains(logPartition)) {
                return chronicleLogTailer.commit(logPartition);
            }
        }
        throw new IllegalArgumentException("No tailer matching: " + logPartition);
    }

    @Override // org.nuxeo.lib.stream.log.LogTailer
    public void commit() {
        this.tailers.forEach((v0) -> {
            v0.commit();
        });
    }

    @Override // org.nuxeo.lib.stream.log.LogTailer
    public void toEnd() {
        this.tailers.forEach((v0) -> {
            v0.toEnd();
        });
    }

    @Override // org.nuxeo.lib.stream.log.LogTailer
    public void toStart() {
        this.tailers.forEach((v0) -> {
            v0.toStart();
        });
    }

    @Override // org.nuxeo.lib.stream.log.LogTailer
    public void toLastCommitted() {
        this.tailers.forEach((v0) -> {
            v0.toLastCommitted();
        });
    }

    @Override // org.nuxeo.lib.stream.log.LogTailer
    public Collection<LogPartition> assignments() {
        return this.logPartitions;
    }

    @Override // org.nuxeo.lib.stream.log.LogTailer
    public Name group() {
        return this.group;
    }

    @Override // org.nuxeo.lib.stream.log.LogTailer
    public boolean closed() {
        return this.closed;
    }

    @Override // org.nuxeo.lib.stream.log.LogTailer
    public Codec<M> getCodec() {
        return this.codec;
    }

    @Override // org.nuxeo.lib.stream.log.LogTailer
    public void seek(LogOffset logOffset) {
        for (ChronicleLogTailer<M> chronicleLogTailer : this.tailers) {
            if (chronicleLogTailer.assignments().contains(logOffset.partition())) {
                chronicleLogTailer.seek(logOffset);
                return;
            }
        }
        throw new IllegalStateException("Cannot seek, tailer " + this + " has no assignment for partition: " + logOffset);
    }

    @Override // org.nuxeo.lib.stream.log.LogTailer
    public LogOffset offsetForTimestamp(LogPartition logPartition, long j) {
        throw new UnsupportedOperationException("ChronicleLog does not support seek by timestamp");
    }

    @Override // org.nuxeo.lib.stream.log.LogTailer
    public void reset() {
        this.tailers.forEach((v0) -> {
            v0.reset();
        });
    }

    @Override // org.nuxeo.lib.stream.log.LogTailer
    public void reset(LogPartition logPartition) {
        this.tailers.stream().filter(chronicleLogTailer -> {
            return chronicleLogTailer.assignments().contains(logPartition);
        }).findFirst().orElseThrow(() -> {
            return new IllegalArgumentException(String.format("Cannot reset, partition: %s not found on tailer assignments: %s", logPartition, this.logPartitions));
        }).reset();
    }

    @Override // org.nuxeo.lib.stream.log.LogTailer, java.lang.AutoCloseable
    public void close() {
        Iterator<ChronicleLogTailer<M>> it = this.tailers.iterator();
        while (it.hasNext()) {
            it.next().close();
        }
        this.closed = true;
    }
}
