package org.nuxeo.lib.stream.log.chronicle;

import java.io.Externalizable;
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.nio.file.Files;
import java.nio.file.OpenOption;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.nio.file.StandardOpenOption;
import java.nio.file.attribute.FileAttribute;
import java.time.Duration;
import java.time.Instant;
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
import java.util.Properties;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.stream.Stream;
import net.openhft.chronicle.queue.ChronicleQueue;
import net.openhft.chronicle.queue.ExcerptAppender;
import net.openhft.chronicle.queue.impl.single.SingleChronicleQueue;
import net.openhft.chronicle.queue.impl.single.SingleChronicleQueueBuilder;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.nuxeo.lib.stream.codec.Codec;
import org.nuxeo.lib.stream.codec.NoCodec;
import org.nuxeo.lib.stream.log.LogOffset;
import org.nuxeo.lib.stream.log.LogPartition;
import org.nuxeo.lib.stream.log.LogTailer;
import org.nuxeo.lib.stream.log.internals.CloseableLogAppender;
import org.nuxeo.lib.stream.log.internals.LogOffsetImpl;

/* loaded from: input_file:org/nuxeo/lib/stream/log/chronicle/ChronicleLogAppender.class */
public class ChronicleLogAppender<M extends Externalizable> implements CloseableLogAppender<M> {
    private static final Log log = LogFactory.getLog(ChronicleLogAppender.class);
    protected static final String PARTITION_PREFIX = "P-";
    protected static final String METADATA_FILE = "metadata.properties";
    protected static final int POLL_INTERVAL_MS = 100;
    protected static final int MAX_PARTITIONS = 100;
    public static final String MSG_KEY = "msg";
    public static final int CQ_BLOCK_SIZE = 4194304;
    public static final String RETENTION_KEY = "retention";
    public static final String PARTITIONS_KEY = "partitions";
    public static final String BLOCK_SIZE_KEY = "blockSize";
    protected final List<ChronicleQueue> partitions;
    protected final int nbPartitions;
    protected final File basePath;
    protected final int blockSize;
    protected final String name;
    protected final ConcurrentLinkedQueue<ChronicleLogTailer<M>> tailers = new ConcurrentLinkedQueue<>();
    protected final ChronicleRetentionDuration retention;
    protected final Codec<M> codec;
    protected volatile boolean closed;

    protected ChronicleLogAppender(Codec<M> codec, File file, ChronicleRetentionDuration chronicleRetentionDuration) {
        if (!exists(file)) {
            throw new IllegalArgumentException("Cannot open Chronicle Queues, invalid path: " + file);
        }
        if (log.isDebugEnabled()) {
            log.debug("Opening: " + toString());
        }
        Objects.requireNonNull(codec);
        this.codec = codec;
        this.basePath = file;
        this.name = file.getName();
        Properties readMetadata = getMetadataPath().toFile().exists() ? readMetadata(getMetadataPath()) : guessMetadata(chronicleRetentionDuration);
        ChronicleRetentionDuration chronicleRetentionDuration2 = new ChronicleRetentionDuration(readMetadata.getProperty(RETENTION_KEY));
        if (chronicleRetentionDuration.disable()) {
            this.retention = ChronicleRetentionDuration.disableOf(chronicleRetentionDuration2);
        } else {
            if (chronicleRetentionDuration.getRollCycle() != chronicleRetentionDuration2.getRollCycle()) {
                throw new IllegalArgumentException(String.format("Cannot open Log %s: expecting retention: %s got: %s", this.name, chronicleRetentionDuration2, chronicleRetentionDuration));
            }
            this.retention = chronicleRetentionDuration;
        }
        this.nbPartitions = Integer.parseInt(readMetadata.getProperty("partitions"));
        this.blockSize = Integer.parseInt(readMetadata.getProperty(BLOCK_SIZE_KEY));
        this.partitions = new ArrayList(this.nbPartitions);
        initPartitions(false);
    }

    protected ChronicleLogAppender(Codec<M> codec, File file, int i, ChronicleRetentionDuration chronicleRetentionDuration) {
        if (i <= 0) {
            throw new IllegalArgumentException("Number of partitions must be > 0");
        }
        if (i > 100) {
            throw new IllegalArgumentException(String.format("Cannot create more than: %d partitions for log: %s, requested: %d", 100, file, Integer.valueOf(i)));
        }
        if (exists(file)) {
            throw new IllegalArgumentException("Cannot create Chronicle Queues, already exists: " + file);
        }
        if (!file.exists() && !file.mkdirs()) {
            throw new IllegalArgumentException("Invalid path to create Chronicle Queues: " + file);
        }
        Objects.requireNonNull(codec);
        this.nbPartitions = i;
        this.codec = codec;
        this.name = file.getName();
        this.basePath = file;
        this.retention = chronicleRetentionDuration;
        this.partitions = new ArrayList(this.nbPartitions);
        this.blockSize = 4194304;
        if (log.isDebugEnabled()) {
            log.debug("Creating: " + toString());
        }
        initPartitions(true);
        saveMetadata();
    }

    protected void initPartitions(boolean z) {
        for (int i = 0; i < this.nbPartitions; i++) {
            Path path = Paths.get(getBasePath(), String.format("%s%02d", PARTITION_PREFIX, Integer.valueOf(i)));
            if (z) {
                try {
                    Files.createDirectories(path, new FileAttribute[0]);
                } catch (IOException e) {
                    throw new IllegalArgumentException("Cannot create directory: " + path.toAbsolutePath(), e);
                }
            }
            ChronicleRetentionListener chronicleRetentionListener = null;
            SingleChronicleQueueBuilder blockSize = SingleChronicleQueueBuilder.binary(path).rollCycle(this.retention.getRollCycle()).blockSize(this.blockSize);
            if (!this.retention.disable()) {
                chronicleRetentionListener = new ChronicleRetentionListener(this.retention);
                blockSize.storeFileListener(chronicleRetentionListener);
            }
            SingleChronicleQueue build = blockSize.build();
            this.partitions.add(build);
            if (chronicleRetentionListener != null) {
                chronicleRetentionListener.setQueue(build);
            }
        }
    }

    protected void saveMetadata() {
        Path metadataPath = getMetadataPath();
        StringBuilder sb = new StringBuilder();
        sb.append(String.format("# Log created %s%n", Instant.now().toString()));
        sb.append(String.format("%s=%d%n", "partitions", Integer.valueOf(this.nbPartitions)));
        sb.append(String.format("%s=%s%n", RETENTION_KEY, this.retention));
        sb.append(String.format("%s=%d%n", BLOCK_SIZE_KEY, Integer.valueOf(this.blockSize)));
        try {
            Files.write(metadataPath, sb.toString().getBytes(), StandardOpenOption.CREATE_NEW);
            if (log.isDebugEnabled()) {
                log.debug(String.format("Created Log: %s%n%s", this.name, sb.toString()), new Throwable("here"));
            }
        } catch (IOException e) {
            throw new IllegalArgumentException("Unable to create metadata file: " + metadataPath, e);
        }
    }

    protected Path getMetadataPath() {
        return this.basePath.toPath().resolve(METADATA_FILE);
    }

    protected static Properties readMetadata(Path path) {
        Properties properties = new Properties();
        try {
            InputStream newInputStream = Files.newInputStream(path, new OpenOption[0]);
            Throwable th = null;
            try {
                properties.load(newInputStream);
                if (newInputStream != null) {
                    if (0 != 0) {
                        try {
                            newInputStream.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    } else {
                        newInputStream.close();
                    }
                }
                return properties;
            } finally {
            }
        } catch (IOException e) {
            throw new IllegalArgumentException("Cannot open Log metadata file: " + path, e);
        }
    }

    protected Properties guessMetadata(ChronicleRetentionDuration chronicleRetentionDuration) {
        Properties properties = new Properties();
        properties.setProperty("partitions", Integer.toString(discoverPartitions(this.basePath.toPath())));
        properties.setProperty(RETENTION_KEY, chronicleRetentionDuration.getRetention());
        properties.setProperty(BLOCK_SIZE_KEY, Integer.toString(4194304));
        return properties;
    }

    protected static boolean exists(File file) {
        return file.isDirectory() && file.list().length > 0;
    }

    public static <M extends Externalizable> ChronicleLogAppender<M> create(Codec<M> codec, File file, int i, ChronicleRetentionDuration chronicleRetentionDuration) {
        return new ChronicleLogAppender<>(codec, file, i, chronicleRetentionDuration);
    }

    public static <M extends Externalizable> ChronicleLogAppender<M> create(Codec<M> codec, File file, int i) {
        return new ChronicleLogAppender<>(codec, file, i, ChronicleRetentionDuration.NONE);
    }

    public static <M extends Externalizable> ChronicleLogAppender<M> open(Codec<M> codec, File file) {
        return new ChronicleLogAppender<>(codec, file, ChronicleRetentionDuration.NONE);
    }

    public static <M extends Externalizable> ChronicleLogAppender<M> open(Codec<M> codec, File file, ChronicleRetentionDuration chronicleRetentionDuration) {
        return new ChronicleLogAppender<>(codec, file, chronicleRetentionDuration);
    }

    public String getBasePath() {
        return this.basePath.getPath();
    }

    @Override // org.nuxeo.lib.stream.log.LogAppender
    public String name() {
        return this.name;
    }

    @Override // org.nuxeo.lib.stream.log.LogAppender
    public int size() {
        return this.nbPartitions;
    }

    @Override // org.nuxeo.lib.stream.log.LogAppender
    public LogOffset append(int i, M m) {
        ExcerptAppender acquireAppender = this.partitions.get(i).acquireAppender();
        if (NoCodec.NO_CODEC.equals(this.codec)) {
            acquireAppender.writeDocument(wireOut -> {
                wireOut.write(MSG_KEY).object(m);
            });
        } else {
            acquireAppender.writeDocument(wireOut2 -> {
                wireOut2.write().bytes(this.codec.encode(m));
            });
        }
        LogOffsetImpl logOffsetImpl = new LogOffsetImpl(this.name, i, acquireAppender.lastIndexAppended());
        if (log.isDebugEnabled()) {
            log.debug(String.format("append to %s, value: %s", logOffsetImpl, m));
        }
        return logOffsetImpl;
    }

    public LogTailer<M> createTailer(LogPartition logPartition, String str, Codec<M> codec) {
        return addTailer(new ChronicleLogTailer<>(codec, this.basePath.toString(), this.partitions.get(logPartition.partition()).createTailer(), logPartition, str, this.retention));
    }

    public long endOffset(int i) {
        return this.partitions.get(i).createTailer().toEnd().index();
    }

    public long firstOffset(int i) {
        long firstIndex = this.partitions.get(i).firstIndex();
        if (firstIndex == Long.MAX_VALUE) {
            return 0L;
        }
        return firstIndex;
    }

    public long countMessages(int i, long j, long j2) {
        SingleChronicleQueue singleChronicleQueue = (SingleChronicleQueue) this.partitions.get(i);
        try {
            return singleChronicleQueue.countExcerpts(j, j2);
        } catch (IllegalStateException e) {
            if (!log.isDebugEnabled()) {
                return 0L;
            }
            log.debug("Missing low cycle file: " + j + " for queue: " + singleChronicleQueue + " " + e.getMessage());
            return 0L;
        }
    }

    protected LogTailer<M> addTailer(ChronicleLogTailer<M> chronicleLogTailer) {
        this.tailers.add(chronicleLogTailer);
        return chronicleLogTailer;
    }

    @Override // org.nuxeo.lib.stream.log.LogAppender
    public boolean waitFor(LogOffset logOffset, String str, Duration duration) throws InterruptedException {
        long offset = logOffset.offset();
        ChronicleLogOffsetTracker chronicleLogOffsetTracker = new ChronicleLogOffsetTracker(this.basePath.toString(), logOffset.partition().partition(), str, ChronicleRetentionDuration.disableOf(this.retention));
        Throwable th = null;
        try {
            try {
                boolean isProcessed = isProcessed(chronicleLogOffsetTracker, offset);
                if (isProcessed) {
                    if (chronicleLogOffsetTracker != null) {
                        if (0 != 0) {
                            try {
                                chronicleLogOffsetTracker.close();
                            } catch (Throwable th2) {
                                th.addSuppressed(th2);
                            }
                        } else {
                            chronicleLogOffsetTracker.close();
                        }
                    }
                    return true;
                }
                long millis = duration.toMillis();
                long currentTimeMillis = System.currentTimeMillis() + millis;
                long min = Math.min(100L, millis);
                while (!isProcessed) {
                    if (System.currentTimeMillis() >= currentTimeMillis) {
                        break;
                    }
                    Thread.sleep(min);
                    isProcessed = isProcessed(chronicleLogOffsetTracker, offset);
                }
                if (chronicleLogOffsetTracker != null) {
                    if (0 != 0) {
                        try {
                            chronicleLogOffsetTracker.close();
                        } catch (Throwable th3) {
                            th.addSuppressed(th3);
                        }
                    } else {
                        chronicleLogOffsetTracker.close();
                    }
                }
                return isProcessed;
            } finally {
            }
        } catch (Throwable th4) {
            if (chronicleLogOffsetTracker != null) {
                if (th != null) {
                    try {
                        chronicleLogOffsetTracker.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    chronicleLogOffsetTracker.close();
                }
            }
            throw th4;
        }
    }

    @Override // org.nuxeo.lib.stream.log.LogAppender
    public boolean closed() {
        return this.closed;
    }

    @Override // org.nuxeo.lib.stream.log.LogAppender
    public Codec<M> getCodec() {
        return this.codec;
    }

    protected boolean isProcessed(ChronicleLogOffsetTracker chronicleLogOffsetTracker, long j) {
        long readLastCommittedOffset = chronicleLogOffsetTracker.readLastCommittedOffset();
        return readLastCommittedOffset > 0 && readLastCommittedOffset >= j;
    }

    @Override // org.nuxeo.lib.stream.log.internals.CloseableLogAppender, java.lang.AutoCloseable
    public void close() {
        log.debug("Closing: " + toString());
        this.tailers.stream().filter((v0) -> {
            return Objects.nonNull(v0);
        }).forEach((v0) -> {
            v0.close();
        });
        this.tailers.clear();
        this.partitions.stream().filter((v0) -> {
            return Objects.nonNull(v0);
        }).forEach((v0) -> {
            v0.close();
        });
        this.partitions.clear();
        this.closed = true;
    }

    public static int partitions(Path path) {
        Path resolve = path.resolve(METADATA_FILE);
        return resolve.toFile().exists() ? Integer.parseInt(readMetadata(resolve).getProperty("partitions")) : discoverPartitions(path);
    }

    public static int discoverPartitions(Path path) {
        try {
            Stream<Path> list = Files.list(path);
            Throwable th = null;
            try {
                int count = (int) list.filter(ChronicleLogAppender::isPartitionDirectory).count();
                if (count == 0) {
                    throw new IOException("No chronicles queues file found");
                }
                return count;
            } finally {
                if (list != null) {
                    if (0 != 0) {
                        try {
                            list.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    } else {
                        list.close();
                    }
                }
            }
        } catch (IOException e) {
            throw new IllegalArgumentException("Invalid basePath for queue: " + path, e);
        }
    }

    protected static boolean isPartitionDirectory(Path path) {
        return path.toFile().isDirectory() && path.getFileName().toString().startsWith(PARTITION_PREFIX);
    }

    public String toString() {
        return "ChronicleLogAppender{nbPartitions=" + this.nbPartitions + ", basePath=" + this.basePath + ", name='" + this.name + "', retention=" + this.retention + ", closed=" + this.closed + ", codec=" + this.codec + '}';
    }

    public ChronicleRetentionDuration getRetention() {
        return this.retention;
    }
}
