/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.metadata.util;

import java.io.File;
import java.nio.file.DirectoryStream;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.NoSuchElementException;
import java.util.Optional;
import java.util.OptionalInt;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.kafka.common.message.LeaderChangeMessage;
import org.apache.kafka.common.message.SnapshotFooterRecord;
import org.apache.kafka.common.message.SnapshotHeaderRecord;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.metadata.util.ClusterMetadataSource;
import org.apache.kafka.metadata.util.KRaftBatchFileReader;
import org.apache.kafka.queue.EventQueue;
import org.apache.kafka.queue.KafkaEventQueue;
import org.apache.kafka.raft.Batch;
import org.apache.kafka.raft.BatchReader;
import org.apache.kafka.raft.LeaderAndEpoch;
import org.apache.kafka.raft.OffsetAndEpoch;
import org.apache.kafka.raft.RaftClient;
import org.apache.kafka.raft.internals.MemoryBatchReader;
import org.apache.kafka.server.common.ApiMessageAndVersion;
import org.apache.kafka.snapshot.RecordsSnapshotReader;
import org.apache.kafka.snapshot.SnapshotReader;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public final class LocalDirectorySource
implements ClusterMetadataSource {
    private static final Logger log = LoggerFactory.getLogger(LocalDirectorySource.class);
    private static final String LOG_SUFFIX = ".log";
    private final String metadataDirectory;
    private final KafkaEventQueue queue;
    private final CompletableFuture<Void> caughtUpFuture = new CompletableFuture();
    private RaftClient.Listener<ApiMessageAndVersion> listener = null;
    private long previousBaseOffset = -1L;

    private String selectNextFileName(boolean considerSnapshots) throws Exception {
        try (DirectoryStream<Path> stream = Files.newDirectoryStream(Paths.get(this.metadataDirectory, new String[0]));){
            String string = LocalDirectorySource.selectNextFileName(considerSnapshots, this.previousBaseOffset, stream.iterator());
            return string;
        }
    }

    static String selectNextFileName(boolean considerSnapshots, long lastReadOffset, Iterator<Path> iterator) {
        String best = "";
        long bestOffset = Long.MAX_VALUE;
        while (iterator.hasNext()) {
            long nextOffset;
            Path next = iterator.next();
            String name = "" + next.getFileName();
            if (name.endsWith(LOG_SUFFIX)) {
                if (best.endsWith(".checkpoint") || (nextOffset = LocalDirectorySource.extractLogFileOffsetFromName(name)) >= bestOffset || nextOffset <= lastReadOffset) continue;
                best = name;
                bestOffset = nextOffset;
                continue;
            }
            if (!name.endsWith(".checkpoint") || !considerSnapshots) continue;
            nextOffset = LocalDirectorySource.extractSnapshotFileOffsetFromName(name);
            if (best.endsWith(".checkpoint")) {
                if (nextOffset <= bestOffset) continue;
                best = name;
                bestOffset = nextOffset;
                continue;
            }
            best = name;
            bestOffset = nextOffset;
        }
        return best;
    }

    static long extractLogFileOffsetFromName(String name) {
        int dotIndex = name.indexOf(".");
        if (dotIndex < 0) {
            throw new RuntimeException("No dot found in .log file name.");
        }
        return Long.parseLong(name.substring(0, dotIndex));
    }

    static long extractSnapshotFileOffsetFromName(String name) {
        int dashIndex = name.indexOf("-");
        if (dashIndex < 0) {
            throw new RuntimeException("No dash found in .snapshot file name.");
        }
        return Long.parseLong(name.substring(0, dashIndex));
    }

    private void scheduleLoadFile(String name) {
        if (name.endsWith(".checkpoint")) {
            log.warn("Loading snapshot file " + name);
            this.queue.append((EventQueue.Event)new LoadSnapshotFile(name));
        } else {
            log.warn("Loading log file " + name);
            this.queue.append((EventQueue.Event)new LoadLogFile(name));
        }
    }

    static OffsetAndEpoch calculateSnapshotId(String name) {
        int dashIndex = name.indexOf("-");
        if (dashIndex < 0) {
            throw new RuntimeException("Unable to find the first dash in the snapshot name " + name);
        }
        int periodIndex = name.indexOf(".");
        if (periodIndex < 0) {
            throw new RuntimeException("Unable to find the first period in the snapshot name " + name);
        }
        long offset = Long.parseLong(name.substring(0, dashIndex));
        int epoch = Integer.parseInt(name.substring(dashIndex + 1, periodIndex));
        return new OffsetAndEpoch(offset, epoch);
    }

    static Optional<LeaderAndEpoch> parseControlRecords(Batch<ApiMessageAndVersion> batch) {
        Optional<LeaderAndEpoch> leaderChange = Optional.empty();
        for (ApiMessageAndVersion messageAndVersion : batch) {
            if (messageAndVersion.message() instanceof LeaderChangeMessage) {
                LeaderChangeMessage message = (LeaderChangeMessage)messageAndVersion.message();
                if (message.leaderId() >= 0) {
                    leaderChange = Optional.of(new LeaderAndEpoch(OptionalInt.of(message.leaderId()), batch.epoch()));
                    continue;
                }
                leaderChange = Optional.of(new LeaderAndEpoch(OptionalInt.empty(), batch.epoch()));
                continue;
            }
            if (messageAndVersion.message() instanceof SnapshotHeaderRecord || messageAndVersion.message() instanceof SnapshotFooterRecord) continue;
            throw new RuntimeException("Unknown control record type " + messageAndVersion.message().getClass().getCanonicalName());
        }
        return leaderChange;
    }

    public LocalDirectorySource(String metadataDirectory) {
        this.metadataDirectory = metadataDirectory;
        this.queue = new KafkaEventQueue(Time.SYSTEM, new LogContext("[LocalDirectorySource] "), "LocalDirectorySource_");
    }

    @Override
    public void start(ClusterMetadataSource.Listener<ApiMessageAndVersion> listener) throws Exception {
        SelectInitialFile event = new SelectInitialFile(listener);
        this.queue.append((EventQueue.Event)event);
        event.future().get();
    }

    @Override
    public CompletableFuture<Void> caughtUpFuture() {
        return this.caughtUpFuture;
    }

    public void beginShutdown(String reason) {
        this.queue.beginShutdown(reason, (EventQueue.Event)new Shutdown(reason));
    }

    @Override
    public void close() throws Exception {
        this.beginShutdown("closing");
        this.queue.close();
    }

    @Override
    public String toString() {
        return "ClusterMetadataLogDirectoryReader(metadataDirectory=" + this.metadataDirectory + ")";
    }

    class Shutdown
    implements EventQueue.Event {
        private final String reason;

        Shutdown(String reason) {
            this.reason = reason;
        }

        public void run() throws Exception {
            if (this.reason.equals("done")) {
                LocalDirectorySource.this.caughtUpFuture.complete(null);
            } else {
                LocalDirectorySource.this.caughtUpFuture.completeExceptionally(new RuntimeException(this.reason));
            }
            if (LocalDirectorySource.this.listener != null) {
                LocalDirectorySource.this.listener.beginShutdown();
            }
        }

        public void handleException(Throwable e) {
            log.error("shutdown error", e);
        }
    }

    class LoadLogFile
    implements EventQueue.Event {
        private final String path;

        LoadLogFile(String name) {
            this.path = LocalDirectorySource.this.metadataDirectory + File.separator + name;
        }

        public void run() throws Exception {
            try (KRaftBatchFileReader reader = new KRaftBatchFileReader.Builder().setPath(this.path).build();){
                while (reader.hasNext()) {
                    KRaftBatchFileReader.KRaftBatch kraftBatch = reader.next();
                    if (kraftBatch.isControl()) {
                        Optional<LeaderAndEpoch> leaderChange = LocalDirectorySource.parseControlRecords(kraftBatch.batch());
                        if (leaderChange.isPresent()) {
                            LocalDirectorySource.this.listener.handleLeaderChange(leaderChange.get());
                        }
                    } else {
                        Batch<ApiMessageAndVersion> batch = kraftBatch.batch();
                        LocalDirectorySource.this.listener.handleCommit((BatchReader)MemoryBatchReader.of(Collections.singletonList(Batch.data((long)batch.baseOffset(), (int)batch.epoch(), (long)batch.appendTimestamp(), (int)batch.sizeInBytes(), (List)batch.records())), __ -> {}));
                    }
                    LocalDirectorySource.this.previousBaseOffset = kraftBatch.batch().baseOffset();
                }
            }
            LocalDirectorySource.this.queue.append((EventQueue.Event)new SelectNextFile());
        }

        public void handleException(Throwable e) {
            LocalDirectorySource.this.beginShutdown("error reading " + this.path);
        }
    }

    class LoadSnapshotFile
    implements EventQueue.Event {
        private final String path;
        private final OffsetAndEpoch snapshotId;

        LoadSnapshotFile(String name) {
            this.path = LocalDirectorySource.this.metadataDirectory + File.separator + name;
            this.snapshotId = LocalDirectorySource.calculateSnapshotId(name);
        }

        public void run() throws Exception {
            final AtomicReference<Object> leaderChange = new AtomicReference<Object>(null);
            final KRaftBatchFileReader reader = new KRaftBatchFileReader.Builder().setPath(this.path).build();
            LocalDirectorySource.this.listener.handleSnapshot((SnapshotReader)new RecordsSnapshotReader(this.snapshotId, (Iterator)new Iterator<Batch<ApiMessageAndVersion>>(){
                Batch<ApiMessageAndVersion> batch = null;

                private boolean loadNext() {
                    while (this.batch == null) {
                        if (!reader.hasNext()) {
                            return false;
                        }
                        KRaftBatchFileReader.KRaftBatch kRaftBatch = reader.next();
                        if (kRaftBatch.isControl()) {
                            Optional<LeaderAndEpoch> newLeaderChange = LocalDirectorySource.parseControlRecords(kRaftBatch.batch());
                            if (!newLeaderChange.isPresent()) continue;
                            leaderChange.set(newLeaderChange.get());
                            continue;
                        }
                        this.batch = kRaftBatch.batch();
                    }
                    return true;
                }

                @Override
                public boolean hasNext() {
                    return this.loadNext();
                }

                @Override
                public Batch<ApiMessageAndVersion> next() {
                    if (!this.loadNext()) {
                        throw new NoSuchElementException();
                    }
                    Batch<ApiMessageAndVersion> result = this.batch;
                    this.batch = null;
                    return result;
                }
            }, (AutoCloseable)reader));
            if (leaderChange.get() != null) {
                LocalDirectorySource.this.listener.handleLeaderChange((LeaderAndEpoch)leaderChange.get());
            }
            LocalDirectorySource.this.previousBaseOffset = this.snapshotId.offset;
            LocalDirectorySource.this.queue.append((EventQueue.Event)new SelectNextFile());
        }
    }

    class SelectNextFile
    implements EventQueue.Event {
        SelectNextFile() {
        }

        public void run() throws Exception {
            String name = LocalDirectorySource.this.selectNextFileName(false);
            if (name.isEmpty()) {
                LocalDirectorySource.this.beginShutdown("done");
            } else {
                LocalDirectorySource.this.scheduleLoadFile(name);
            }
        }

        public void handleException(Throwable e) {
            LocalDirectorySource.this.beginShutdown("error picking the next file");
        }
    }

    class SelectInitialFile
    implements EventQueue.Event {
        private final RaftClient.Listener<ApiMessageAndVersion> newListener;
        private final CompletableFuture<Void> future;

        SelectInitialFile(RaftClient.Listener<ApiMessageAndVersion> newListener) {
            this.newListener = newListener;
            this.future = new CompletableFuture();
        }

        public void run() throws Exception {
            LocalDirectorySource.this.listener = this.newListener;
            String name = LocalDirectorySource.this.selectNextFileName(true);
            if (name.isEmpty()) {
                throw new RuntimeException("Nothing to read found in " + LocalDirectorySource.this.metadataDirectory);
            }
            LocalDirectorySource.this.scheduleLoadFile(name);
            this.future.complete(null);
        }

        public void handleException(Throwable e) {
            this.future.completeExceptionally(e);
            LocalDirectorySource.this.beginShutdown("error selecting initial file");
        }

        CompletableFuture<Void> future() {
            return this.future;
        }
    }
}

