package org.nuxeo.lib.stream.log.chronicle;

import java.io.Externalizable;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.LinkOption;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import net.openhft.chronicle.queue.impl.single.SingleChronicleQueue;
import net.openhft.chronicle.queue.impl.table.SingleTableStore;
import org.apache.commons.io.FileUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.nuxeo.lib.stream.codec.Codec;
import org.nuxeo.lib.stream.codec.NoCodec;
import org.nuxeo.lib.stream.log.LogLag;
import org.nuxeo.lib.stream.log.LogPartition;
import org.nuxeo.lib.stream.log.LogTailer;
import org.nuxeo.lib.stream.log.RebalanceListener;
import org.nuxeo.lib.stream.log.internals.AbstractLogManager;
import org.nuxeo.lib.stream.log.internals.CloseableLogAppender;

/* loaded from: input_file:org/nuxeo/lib/stream/log/chronicle/ChronicleLogManager.class */
public class ChronicleLogManager extends AbstractLogManager {
    private static final Log log = LogFactory.getLog(ChronicleLogManager.class);
    protected final Path basePath;
    protected final ChronicleRetentionDuration retention;

    public ChronicleLogManager(Path path) {
        this(path, null);
    }

    public ChronicleLogManager(Path path, String str) {
        this.basePath = path;
        this.retention = new ChronicleRetentionDuration(str);
    }

    protected static void deleteQueueBasePath(Path path) {
        try {
            log.info("Removing Chronicle Queues directory: " + path);
            Stream<Path> list = Files.list(path);
            Throwable th = null;
            try {
                if (((int) list.filter(path2 -> {
                    return path2.toFile().isFile() && !isChronicleLogFile(path2);
                }).count()) > 0) {
                    String str = "ChronicleLog basePath: " + path + " contains unknown files, please choose another basePath";
                    log.error(str);
                    throw new IllegalArgumentException(str);
                }
                if (list != null) {
                    if (0 != 0) {
                        try {
                            list.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    } else {
                        list.close();
                    }
                }
                FileUtils.deleteDirectory(path.toFile());
            } finally {
            }
        } catch (IOException e) {
            String str2 = "Cannot remove Chronicle Queues directory: " + path + " " + e.getMessage();
            log.error(str2, e);
            throw new IllegalArgumentException(str2, e);
        }
    }

    protected static boolean isChronicleLogFile(Path path) {
        String path2 = path.getFileName().toString();
        return path2.endsWith(SingleChronicleQueue.SUFFIX) || path2.endsWith(SingleTableStore.SUFFIX) || "metadata.properties".equals(path2);
    }

    public String getBasePath() {
        return this.basePath.toAbsolutePath().toString();
    }

    @Override // org.nuxeo.lib.stream.log.LogManager
    public boolean exists(String str) {
        try {
            Stream<Path> list = Files.list(this.basePath.resolve(str));
            Throwable th = null;
            try {
                try {
                    boolean z = list.count() > 0;
                    if (list != null) {
                        if (0 != 0) {
                            try {
                                list.close();
                            } catch (Throwable th2) {
                                th.addSuppressed(th2);
                            }
                        } else {
                            list.close();
                        }
                    }
                    return z;
                } finally {
                }
            } finally {
            }
        } catch (IOException e) {
            return false;
        }
    }

    @Override // org.nuxeo.lib.stream.log.internals.AbstractLogManager
    public void create(String str, int i) {
        ChronicleLogAppender.create(NoCodec.NO_CODEC, this.basePath.resolve(str).toFile(), i, this.retention).close();
    }

    @Override // org.nuxeo.lib.stream.log.internals.AbstractLogManager
    protected int getSize(String str) {
        return ChronicleLogAppender.partitions(this.basePath.resolve(str));
    }

    @Override // org.nuxeo.lib.stream.log.internals.AbstractLogManager, org.nuxeo.lib.stream.log.LogManager
    public boolean delete(String str) {
        Path resolve = this.basePath.resolve(str);
        if (!resolve.toFile().isDirectory()) {
            return false;
        }
        deleteQueueBasePath(resolve);
        return true;
    }

    protected LogLag getLagForPartition(String str, int i, String str2) {
        long readLastCommittedOffset;
        Path resolve = this.basePath.resolve(str);
        if (ChronicleLogOffsetTracker.exists(resolve, str2)) {
            ChronicleLogOffsetTracker chronicleLogOffsetTracker = new ChronicleLogOffsetTracker(resolve.toString(), i, str2, ChronicleRetentionDuration.disableOf(this.retention));
            Throwable th = null;
            try {
                try {
                    readLastCommittedOffset = chronicleLogOffsetTracker.readLastCommittedOffset();
                    if (chronicleLogOffsetTracker != null) {
                        if (0 != 0) {
                            try {
                                chronicleLogOffsetTracker.close();
                            } catch (Throwable th2) {
                                th.addSuppressed(th2);
                            }
                        } else {
                            chronicleLogOffsetTracker.close();
                        }
                    }
                } finally {
                }
            } catch (Throwable th3) {
                if (chronicleLogOffsetTracker != null) {
                    if (th != null) {
                        try {
                            chronicleLogOffsetTracker.close();
                        } catch (Throwable th4) {
                            th.addSuppressed(th4);
                        }
                    } else {
                        chronicleLogOffsetTracker.close();
                    }
                }
                throw th3;
            }
        } else {
            readLastCommittedOffset = 0;
        }
        ChronicleLogAppender open = ChronicleLogAppender.open(NoCodec.NO_CODEC, this.basePath.resolve(str).toFile());
        Throwable th5 = null;
        try {
            try {
                long endOffset = open.endOffset(i);
                if (readLastCommittedOffset == 0) {
                    readLastCommittedOffset = open.firstOffset(i);
                }
                LogLag logLag = new LogLag(readLastCommittedOffset, endOffset, open.countMessages(i, readLastCommittedOffset, endOffset), open.countMessages(i, open.firstOffset(i), endOffset));
                if (open != null) {
                    if (0 != 0) {
                        try {
                            open.close();
                        } catch (Throwable th6) {
                            th5.addSuppressed(th6);
                        }
                    } else {
                        open.close();
                    }
                }
                return logLag;
            } finally {
            }
        } catch (Throwable th7) {
            if (open != null) {
                if (th5 != null) {
                    try {
                        open.close();
                    } catch (Throwable th8) {
                        th5.addSuppressed(th8);
                    }
                } else {
                    open.close();
                }
            }
            throw th7;
        }
    }

    @Override // org.nuxeo.lib.stream.log.LogManager
    public List<LogLag> getLagPerPartition(String str, String str2) {
        int size = size(str);
        ArrayList arrayList = new ArrayList(size);
        for (int i = 0; i < size; i++) {
            arrayList.add(getLagForPartition(str, i, str2));
        }
        return arrayList;
    }

    public String toString() {
        return "ChronicleLogManager{basePath=" + this.basePath + ", retention='" + this.retention + "'}";
    }

    @Override // org.nuxeo.lib.stream.log.LogManager
    public List<String> listAll() {
        try {
            Stream<Path> list = Files.list(this.basePath);
            Throwable th = null;
            try {
                List<String> list2 = (List) list.filter(path -> {
                    return Files.isDirectory(path, new LinkOption[0]);
                }).map((v0) -> {
                    return v0.getFileName();
                }).map((v0) -> {
                    return v0.toString();
                }).collect(Collectors.toList());
                if (list != null) {
                    if (0 != 0) {
                        try {
                            list.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    } else {
                        list.close();
                    }
                }
                return list2;
            } finally {
            }
        } catch (IOException e) {
            throw new IllegalArgumentException("Invalid base path: " + this.basePath, e);
        }
    }

    @Override // org.nuxeo.lib.stream.log.LogManager
    public List<String> listConsumerGroups(String str) {
        Path resolve = this.basePath.resolve(str);
        if (!resolve.toFile().exists()) {
            throw new IllegalArgumentException("Unknown Log: " + str);
        }
        try {
            Stream<Path> list = Files.list(resolve);
            Throwable th = null;
            try {
                try {
                    List<String> list2 = (List) list.filter(path -> {
                        return Files.isDirectory(path, new LinkOption[0]);
                    }).map((v0) -> {
                        return v0.getFileName();
                    }).map((v0) -> {
                        return v0.toString();
                    }).filter(ChronicleLogOffsetTracker::isOffsetTracker).map(ChronicleLogOffsetTracker::getGroupFromDirectory).collect(Collectors.toList());
                    if (list != null) {
                        if (0 != 0) {
                            try {
                                list.close();
                            } catch (Throwable th2) {
                                th.addSuppressed(th2);
                            }
                        } else {
                            list.close();
                        }
                    }
                    return list2;
                } finally {
                }
            } finally {
            }
        } catch (IOException e) {
            throw new IllegalArgumentException("Cannot access Log: " + str, e);
        }
    }

    @Override // org.nuxeo.lib.stream.log.internals.AbstractLogManager
    public <M extends Externalizable> CloseableLogAppender<M> createAppender(String str, Codec<M> codec) {
        return ChronicleLogAppender.open(codec, this.basePath.resolve(str).toFile(), this.retention);
    }

    @Override // org.nuxeo.lib.stream.log.internals.AbstractLogManager
    protected <M extends Externalizable> LogTailer<M> doCreateTailer(Collection<LogPartition> collection, String str, Codec<M> codec) {
        ArrayList arrayList = new ArrayList(collection.size());
        collection.forEach(logPartition -> {
            arrayList.add((ChronicleLogTailer) ((ChronicleLogAppender) getAppender(logPartition.name(), codec)).createTailer(logPartition, str, codec));
        });
        return arrayList.size() == 1 ? (LogTailer) arrayList.iterator().next() : new ChronicleCompoundLogTailer(arrayList, str);
    }

    @Override // org.nuxeo.lib.stream.log.internals.AbstractLogManager
    protected <M extends Externalizable> LogTailer<M> doSubscribe(String str, Collection<String> collection, RebalanceListener rebalanceListener, Codec<M> codec) {
        throw new UnsupportedOperationException("subscribe is not supported by Chronicle implementation");
    }
}
