/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.metadata.util;

import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.NoSuchElementException;
import java.util.Optional;
import java.util.OptionalLong;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.kafka.common.record.Records;
import org.apache.kafka.common.utils.BufferSupplier;
import org.apache.kafka.metadata.MetadataRecordSerde;
import org.apache.kafka.metadata.util.ClusterMetadataSource;
import org.apache.kafka.raft.Batch;
import org.apache.kafka.raft.BatchReader;
import org.apache.kafka.raft.Isolation;
import org.apache.kafka.raft.OffsetAndEpoch;
import org.apache.kafka.raft.RaftClient;
import org.apache.kafka.raft.ReplicatedLog;
import org.apache.kafka.raft.internals.RecordsIterator;
import org.apache.kafka.server.common.ApiMessageAndVersion;
import org.apache.kafka.server.common.serialization.RecordSerde;
import org.apache.kafka.snapshot.RawSnapshotReader;
import org.apache.kafka.snapshot.RecordsSnapshotReader;
import org.apache.kafka.snapshot.SnapshotReader;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class LocalMetadataLogReader
implements ClusterMetadataSource {
    private static final Logger log = LoggerFactory.getLogger(LocalMetadataLogReader.class);
    private final ReplicatedLog metadataLog;
    private final OptionalLong stopOffset;
    private final AtomicLong loadedEndOffset;
    private final AtomicReference<OptionalLong> highWaterMark;

    public LocalMetadataLogReader(ReplicatedLog metadataLog, OptionalLong stopOffset) {
        this.metadataLog = metadataLog;
        this.stopOffset = stopOffset;
        this.loadedEndOffset = new AtomicLong(-1L);
        this.highWaterMark = new AtomicReference<OptionalLong>(OptionalLong.empty());
    }

    @Override
    public void start(RaftClient.Listener<ApiMessageAndVersion> listener) throws Exception {
        log.info("Loading metadata up to stop offset {}", (Object)this.stopOffset);
        long stopOffset = this.stopOffset.orElse(Long.MAX_VALUE);
        Optional<OffsetAndEpoch> loadedSnapshotId = this.maybeLoadFromSnapshot(listener, stopOffset);
        long loadedEndOffset = this.maybeLoadFromLogSegments(listener, loadedSnapshotId, stopOffset);
        log.info("Completed load of metadata at end offset {}", (Object)loadedEndOffset);
        this.loadedEndOffset.set(loadedEndOffset);
        this.highWaterMark.set(OptionalLong.of(loadedEndOffset));
    }

    private Optional<OffsetAndEpoch> maybeLoadFromSnapshot(RaftClient.Listener<ApiMessageAndVersion> listener, long stopOffset) {
        Optional snapshotOptional = this.metadataLog.latestSnapshotAtOrBelow(stopOffset);
        if (snapshotOptional.isPresent()) {
            RawSnapshotReader rawSnapshotReader = (RawSnapshotReader)snapshotOptional.get();
            listener.handleLoadSnapshot((SnapshotReader)RecordsSnapshotReader.of((RawSnapshotReader)rawSnapshotReader, (RecordSerde)MetadataRecordSerde.INSTANCE, (BufferSupplier)BufferSupplier.create(), (int)0x800000, (boolean)true));
            return Optional.of(rawSnapshotReader.snapshotId());
        }
        return Optional.empty();
    }

    private long maybeLoadFromLogSegments(RaftClient.Listener<ApiMessageAndVersion> listener, Optional<OffsetAndEpoch> loadedSnapshotId, long stopOffset) {
        BatchReader<ApiMessageAndVersion> reader;
        long loadedSnapshotOffset = loadedSnapshotId.map(OffsetAndEpoch::offset).orElse(0L);
        if (loadedSnapshotOffset < this.metadataLog.startOffset()) {
            return loadedSnapshotOffset;
        }
        long nextOffset = Math.max(loadedSnapshotOffset, this.metadataLog.startOffset());
        long endOffset = Math.min(stopOffset, this.metadataLog.endOffset().offset());
        while (nextOffset < endOffset && (reader = this.readFromLog(nextOffset, stopOffset)).hasNext() && reader.baseOffset() == nextOffset) {
            listener.handleCommit(reader);
            OptionalLong lastOffsetOpt = reader.lastOffset();
            if (!lastOffsetOpt.isPresent()) {
                throw new RuntimeException("Failed to read batches at offset " + nextOffset + " from reader " + String.valueOf(reader));
            }
            nextOffset = lastOffsetOpt.getAsLong() + 1L;
        }
        return nextOffset;
    }

    private BatchReader<ApiMessageAndVersion> readFromLog(long readOffset, long stopOffset) {
        Records records = this.metadataLog.read((long)readOffset, (Isolation)Isolation.UNCOMMITTED).records;
        try (RecordsIterator recordsIterator = new RecordsIterator(records, (RecordSerde)MetadataRecordSerde.INSTANCE, BufferSupplier.NO_CACHING, 0x800000, true);){
            BoundedBatchReader boundedBatchReader = new BoundedBatchReader((RecordsIterator<ApiMessageAndVersion>)recordsIterator, stopOffset);
            return boundedBatchReader;
        }
    }

    @Override
    public OptionalLong highWaterMark() {
        return this.highWaterMark.get();
    }

    @Override
    public void close() throws Exception {
        this.metadataLog.close();
    }

    public OptionalLong loadedEndOffset() {
        long endOffset = this.loadedEndOffset.get();
        if (endOffset < 0L) {
            return OptionalLong.empty();
        }
        return OptionalLong.of(endOffset);
    }

    public String toString() {
        return "LocalMetadataLogReader(metadataLog=" + String.valueOf(this.metadataLog) + ", stopOffset=" + String.valueOf(this.stopOffset) + ")";
    }

    private static class BoundedBatchReader
    implements BatchReader<ApiMessageAndVersion> {
        private final List<Batch<ApiMessageAndVersion>> batches = new ArrayList<Batch<ApiMessageAndVersion>>();
        private final Iterator<Batch<ApiMessageAndVersion>> iterator;
        private final long stopOffset;
        private Batch<ApiMessageAndVersion> next;

        private BoundedBatchReader(RecordsIterator<ApiMessageAndVersion> iterator, long stopOffset) {
            Batch batch;
            while (iterator.hasNext() && (batch = iterator.next()).baseOffset() != stopOffset) {
                if (batch.baseOffset() > stopOffset) {
                    throw new MisalignedStopOffsetException("Stop offset " + stopOffset + " must be aligned with the start of a batch, but the next batch begins at offset " + batch.baseOffset());
                }
                this.batches.add((Batch<ApiMessageAndVersion>)batch);
            }
            this.iterator = this.batches.iterator();
            this.stopOffset = stopOffset;
            this.maybeComputeNext();
        }

        public long baseOffset() {
            if (this.batches.isEmpty()) {
                throw new RuntimeException("Batch reader contains no batches.");
            }
            return this.batches.get(0).baseOffset();
        }

        public OptionalLong lastOffset() {
            if (this.batches.isEmpty()) {
                throw new RuntimeException("Batch reader contains no batches.");
            }
            long lastOffset = this.batches.get(this.batches.size() - 1).lastOffset();
            if (lastOffset < 0L) {
                return OptionalLong.empty();
            }
            return OptionalLong.of(lastOffset);
        }

        public void close() {
        }

        public boolean hasNext() {
            return this.next != null;
        }

        private void maybeComputeNext() {
            if (this.iterator.hasNext()) {
                this.next = this.iterator.next();
                if (this.next.baseOffset() == this.stopOffset) {
                    this.next = null;
                } else if (this.next.lastOffset() > this.stopOffset) {
                    throw new MisalignedStopOffsetException("Invalid stop offset " + this.stopOffset + ", which falls in the middle of batch " + String.valueOf(this.next) + " (it must be aligned with the start of a batch)");
                }
            } else {
                this.next = null;
            }
        }

        public Batch<ApiMessageAndVersion> next() {
            if (!this.hasNext()) {
                throw new NoSuchElementException();
            }
            Batch<ApiMessageAndVersion> res = this.next;
            this.maybeComputeNext();
            return res;
        }
    }

    public static class MisalignedStopOffsetException
    extends RuntimeException {
        public MisalignedStopOffsetException(String message) {
            super(message);
        }
    }
}

