package org.nuxeo.ecm.platform.importer.mqueues.mqueues;

import java.io.File;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.LinkOption;
import java.nio.file.Path;
import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.stream.Stream;
import net.openhft.chronicle.queue.ChronicleQueue;
import net.openhft.chronicle.queue.ExcerptAppender;
import net.openhft.chronicle.queue.impl.single.SingleChronicleQueueBuilder;
import org.apache.commons.io.FileUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.nuxeo.ecm.platform.importer.mqueues.message.Message;
import org.nuxeo.ecm.platform.importer.mqueues.mqueues.MQueues;

/* loaded from: input_file:org/nuxeo/ecm/platform/importer/mqueues/mqueues/CQMQueues.class */
public class CQMQueues<M extends Message> implements MQueues<M> {
    private static final Log log = LogFactory.getLog(CQMQueues.class);
    private static final String QUEUE_PREFIX = "Q-";
    private static final int POLL_INTERVAL_MS = 100;
    private final List<ChronicleQueue> queues;
    private final int nbQueues;
    private final File basePath;
    private final ConcurrentLinkedQueue<CQTailer<M>> tailers;

    public CQMQueues(File file, int i) {
        this(file, i, false);
    }

    public CQMQueues(File file) {
        this(file, 0, true);
    }

    @Override // org.nuxeo.ecm.platform.importer.mqueues.mqueues.MQueues
    public int size() {
        return this.nbQueues;
    }

    @Override // org.nuxeo.ecm.platform.importer.mqueues.mqueues.MQueues
    public CQOffset append(int i, M m) {
        ExcerptAppender acquireAppender = this.queues.get(i).acquireAppender();
        acquireAppender.writeDocument(wireOut -> {
            wireOut.write("msg").object(m);
        });
        return new CQOffset(i, acquireAppender.lastIndexAppended());
    }

    @Override // org.nuxeo.ecm.platform.importer.mqueues.mqueues.MQueues
    public MQueues.Tailer<M> createTailer(int i) {
        return addTailer(new CQTailer<>(this.basePath.toString(), this.queues.get(i).createTailer(), i));
    }

    @Override // org.nuxeo.ecm.platform.importer.mqueues.mqueues.MQueues
    public MQueues.Tailer<M> createTailer(int i, String str) {
        return addTailer(new CQTailer<>(this.basePath.toString(), this.queues.get(i).createTailer(), i, str));
    }

    private MQueues.Tailer<M> addTailer(CQTailer<M> cQTailer) {
        this.tailers.add(cQTailer);
        return cQTailer;
    }

    @Override // org.nuxeo.ecm.platform.importer.mqueues.mqueues.MQueues
    public boolean waitFor(Offset offset, Duration duration) throws InterruptedException {
        long offset2 = ((CQOffset) offset).getOffset();
        CQOffsetTracker cQOffsetTracker = new CQOffsetTracker(this.basePath.toString(), ((CQOffset) offset).getQueue(), CQTailer.DEFAULT_OFFSET_NAMESPACE);
        Throwable th = null;
        try {
            try {
                boolean isProcessed = isProcessed(cQOffsetTracker, offset2);
                if (isProcessed) {
                    if (cQOffsetTracker != null) {
                        if (0 != 0) {
                            try {
                                cQOffsetTracker.close();
                            } catch (Throwable th2) {
                                th.addSuppressed(th2);
                            }
                        } else {
                            cQOffsetTracker.close();
                        }
                    }
                    return true;
                }
                long millis = duration.toMillis();
                long currentTimeMillis = System.currentTimeMillis() + millis;
                long min = Math.min(100L, millis);
                while (!isProcessed) {
                    if (System.currentTimeMillis() >= currentTimeMillis) {
                        break;
                    }
                    Thread.sleep(min);
                    isProcessed = isProcessed(cQOffsetTracker, offset2);
                }
                if (cQOffsetTracker != null) {
                    if (0 != 0) {
                        try {
                            cQOffsetTracker.close();
                        } catch (Throwable th3) {
                            th.addSuppressed(th3);
                        }
                    } else {
                        cQOffsetTracker.close();
                    }
                }
                return isProcessed;
            } finally {
            }
        } catch (Throwable th4) {
            if (cQOffsetTracker != null) {
                if (th != null) {
                    try {
                        cQOffsetTracker.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    cQOffsetTracker.close();
                }
            }
            throw th4;
        }
    }

    private boolean isProcessed(CQOffsetTracker cQOffsetTracker, long j) {
        long readLastCommittedOffset = cQOffsetTracker.readLastCommittedOffset();
        return readLastCommittedOffset > 0 && readLastCommittedOffset >= j;
    }

    @Override // java.lang.AutoCloseable
    public void close() throws Exception {
        log.debug("Closing queue");
        this.tailers.stream().filter((v0) -> {
            return Objects.nonNull(v0);
        }).forEach(cQTailer -> {
            try {
                cQTailer.close();
            } catch (Exception e) {
                log.error("Failed to close tailer: " + cQTailer);
            }
        });
        this.tailers.clear();
        this.queues.stream().filter((v0) -> {
            return Objects.nonNull(v0);
        }).forEach((v0) -> {
            v0.close();
        });
        this.queues.clear();
    }

    private CQMQueues(File file, int i, boolean z) {
        this.tailers = new ConcurrentLinkedQueue<>();
        if (z) {
            this.nbQueues = findNbQueues(file);
        } else {
            resetBasePath(file);
            this.nbQueues = i;
        }
        this.basePath = file;
        this.queues = new ArrayList(this.nbQueues);
        log.info("Using chronicle queues in: " + file);
        for (int i2 = 0; i2 < this.nbQueues; i2++) {
            ChronicleQueue build = SingleChronicleQueueBuilder.binary(new File(file, String.format("%s%02d", QUEUE_PREFIX, Integer.valueOf(i2)))).build();
            this.queues.add(build);
            build.file().mkdirs();
        }
        ClassLoader.getSystemClassLoader().setDefaultAssertionStatus(false);
    }

    private int findNbQueues(File file) {
        try {
            Stream<Path> list = Files.list(file.toPath());
            Throwable th = null;
            try {
                try {
                    int count = (int) list.filter(path -> {
                        return Files.isDirectory(path, new LinkOption[0]) && path.getFileName().toString().startsWith(QUEUE_PREFIX);
                    }).count();
                    if (count == 0) {
                        throw new IOException("No chronicles queues file found");
                    }
                    if (list != null) {
                        if (0 != 0) {
                            try {
                                list.close();
                            } catch (Throwable th2) {
                                th.addSuppressed(th2);
                            }
                        } else {
                            list.close();
                        }
                    }
                    return count;
                } finally {
                }
            } finally {
            }
        } catch (IOException e) {
            throw new IllegalArgumentException("Invalid basePath for queue: " + file, e);
        }
    }

    private void resetBasePath(File file) {
        if (file.isDirectory()) {
            deleteQueueBasePath(file);
        }
        if (file.mkdirs()) {
            return;
        }
        String str = "Can not create Chronicle Queues in: " + file;
        log.error(str);
        throw new IllegalArgumentException(str);
    }

    private void deleteQueueBasePath(File file) {
        try {
            log.info("Removing Chronicle Queues directory: " + file);
            Stream<Path> list = Files.list(file.toPath());
            Throwable th = null;
            try {
                try {
                    if (((int) list.filter(path -> {
                        return Files.isRegularFile(path, new LinkOption[0]) && !path.toString().endsWith(".cq4");
                    }).count()) > 0) {
                        String str = "CQMQueues basePath: " + file + " contains unkown files, please choose another basePath";
                        log.error(str);
                        throw new IllegalArgumentException(str);
                    }
                    if (list != null) {
                        if (0 != 0) {
                            try {
                                list.close();
                            } catch (Throwable th2) {
                                th.addSuppressed(th2);
                            }
                        } else {
                            list.close();
                        }
                    }
                    FileUtils.deleteDirectory(file);
                } finally {
                }
            } finally {
            }
        } catch (IOException e) {
            String str2 = "Can not remove Chronicle Queues directory: " + file + " " + e.getMessage();
            log.error(str2, e);
            throw new IllegalArgumentException(str2, e);
        }
    }

    /* JADX WARN: Multi-variable type inference failed */
    @Override // org.nuxeo.ecm.platform.importer.mqueues.mqueues.MQueues
    public /* bridge */ /* synthetic */ Offset append(int i, Message message) {
        return append(i, (int) message);
    }
}
