package org.nuxeo.ecm.platform.importer.mqueues.mqueues;

import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import net.openhft.chronicle.queue.ExcerptTailer;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.nuxeo.ecm.platform.importer.mqueues.message.Message;
import org.nuxeo.ecm.platform.importer.mqueues.mqueues.MQueues;

/* loaded from: input_file:org/nuxeo/ecm/platform/importer/mqueues/mqueues/CQTailer.class */
public class CQTailer<M extends Message> implements MQueues.Tailer<M> {
    private static final long POLL_INTERVAL_MS = 100;
    public static final String DEFAULT_OFFSET_NAMESPACE = "default";
    private final String basePath;
    private final ExcerptTailer tailer;
    private final String nameSpace;
    private final int queueIndex;
    private final CQOffsetTracker offsetTracker;
    private boolean closed;
    private static final Log log = LogFactory.getLog(CQTailer.class);
    private static final Set<String> indexNamespace = Collections.newSetFromMap(new ConcurrentHashMap());

    public CQTailer(String str, ExcerptTailer excerptTailer, int i) {
        this(str, excerptTailer, i, null);
    }

    public CQTailer(String str, ExcerptTailer excerptTailer, int i, String str2) {
        this.closed = false;
        this.basePath = str;
        this.tailer = excerptTailer;
        this.queueIndex = i;
        if (str2 == null) {
            this.nameSpace = DEFAULT_OFFSET_NAMESPACE;
        } else {
            this.nameSpace = str2;
        }
        registerTailer();
        this.offsetTracker = new CQOffsetTracker(str, i, this.nameSpace);
        toLastCommitted();
    }

    private void registerTailer() {
        String tailerKey = getTailerKey();
        if (!indexNamespace.add(tailerKey)) {
            throw new IllegalArgumentException("A tailer for this queue and namespace already exists: " + tailerKey);
        }
    }

    private void unregisterTailer() {
        indexNamespace.remove(getTailerKey());
    }

    private String getTailerKey() {
        return this.basePath + " " + this.queueIndex + " " + this.nameSpace;
    }

    @Override // org.nuxeo.ecm.platform.importer.mqueues.mqueues.MQueues.Tailer
    public M read(Duration duration) throws InterruptedException {
        M read = read();
        if (read != null) {
            return read;
        }
        long millis = duration.toMillis();
        long currentTimeMillis = System.currentTimeMillis() + millis;
        long min = Math.min(POLL_INTERVAL_MS, millis);
        while (read == null && System.currentTimeMillis() < currentTimeMillis) {
            Thread.sleep(min);
            read = read();
        }
        return read;
    }

    private M read() {
        if (this.closed) {
            throw new IllegalStateException("The tailer has been closed.");
        }
        ArrayList arrayList = new ArrayList(1);
        if (this.tailer.readDocument(wireIn -> {
            arrayList.add((Message) wireIn.read("msg").object());
        })) {
            return (M) arrayList.get(0);
        }
        return null;
    }

    @Override // org.nuxeo.ecm.platform.importer.mqueues.mqueues.MQueues.Tailer
    public Offset commit() {
        long index = this.tailer.index();
        this.offsetTracker.commit(index);
        if (log.isTraceEnabled()) {
            log.trace(String.format("queue-%02d commit offset: %d", Integer.valueOf(this.queueIndex), Long.valueOf(index)));
        }
        return new CQOffset(this.queueIndex, index);
    }

    @Override // org.nuxeo.ecm.platform.importer.mqueues.mqueues.MQueues.Tailer
    public void toEnd() {
        log.debug(String.format("queue-%02d toEnd", Integer.valueOf(this.queueIndex)));
        this.tailer.toEnd();
    }

    @Override // org.nuxeo.ecm.platform.importer.mqueues.mqueues.MQueues.Tailer
    public void toStart() {
        log.debug(String.format("queue-%02d toStart", Integer.valueOf(this.queueIndex)));
        this.tailer.toStart();
    }

    @Override // org.nuxeo.ecm.platform.importer.mqueues.mqueues.MQueues.Tailer
    public void toLastCommitted() {
        long lastCommittedOffset = this.offsetTracker.getLastCommittedOffset();
        if (lastCommittedOffset > 0) {
            log.debug(String.format("queue-%02d toLastCommitted found: %d", Integer.valueOf(this.queueIndex), Long.valueOf(lastCommittedOffset)));
            this.tailer.moveToIndex(lastCommittedOffset);
        } else {
            log.debug(String.format("queue-%02d toLastCommitted not found, run from beginning", Integer.valueOf(this.queueIndex)));
            this.tailer.toStart();
        }
    }

    @Override // org.nuxeo.ecm.platform.importer.mqueues.mqueues.MQueues.Tailer
    public int getQueue() {
        return this.queueIndex;
    }

    @Override // java.lang.AutoCloseable
    public void close() throws Exception {
        this.offsetTracker.close();
        unregisterTailer();
        this.closed = true;
    }
}
