/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.metadata.util;

import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Optional;
import java.util.OptionalInt;
import java.util.OptionalLong;
import java.util.concurrent.CompletableFuture;
import org.apache.kafka.common.message.LeaderChangeMessage;
import org.apache.kafka.common.protocol.ByteBufferAccessor;
import org.apache.kafka.common.protocol.Readable;
import org.apache.kafka.common.record.ControlRecordType;
import org.apache.kafka.common.record.FileLogInputStream;
import org.apache.kafka.common.record.FileRecords;
import org.apache.kafka.common.record.Record;
import org.apache.kafka.metadata.MetadataRecordSerde;
import org.apache.kafka.metadata.util.ClusterMetadataSource;
import org.apache.kafka.raft.Batch;
import org.apache.kafka.raft.BatchReader;
import org.apache.kafka.raft.Isolation;
import org.apache.kafka.raft.LeaderAndEpoch;
import org.apache.kafka.raft.OffsetAndEpoch;
import org.apache.kafka.raft.RaftClient;
import org.apache.kafka.raft.ReplicatedLog;
import org.apache.kafka.raft.internals.MemoryBatchReader;
import org.apache.kafka.server.common.ApiMessageAndVersion;
import org.apache.kafka.snapshot.RawSnapshotReader;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class LocalMetadataLogReader
implements ClusterMetadataSource {
    private static final Logger log = LoggerFactory.getLogger(LocalMetadataLogReader.class);
    private final MetadataRecordSerde serde = new MetadataRecordSerde();
    private final ReplicatedLog metadataLog;
    private final OptionalLong stopOffset;
    private final CompletableFuture<Void> future;

    public LocalMetadataLogReader(ReplicatedLog metadataLog, OptionalLong stopOffset) {
        this.metadataLog = metadataLog;
        this.stopOffset = stopOffset;
        this.future = new CompletableFuture();
    }

    @Override
    public void start(ClusterMetadataSource.Listener<ApiMessageAndVersion> listener) throws Exception {
        long processedOffset = 0L;
        Optional snapshotId = this.metadataLog.latestSnapshotId();
        if (snapshotId.isPresent()) {
            Optional snapshotOptional = this.metadataLog.readSnapshot((OffsetAndEpoch)snapshotId.get());
            if (!snapshotOptional.isPresent()) {
                throw new RuntimeException("Could not read latest snapshot " + snapshotId.get());
            }
            FileRecords snapshotRecords = (FileRecords)((RawSnapshotReader)snapshotOptional.get()).records();
            processedOffset = ((OffsetAndEpoch)snapshotId.get()).offset + 1L;
            if (this.stopOffset.isPresent() && this.stopOffset.getAsLong() < ((OffsetAndEpoch)snapshotId.get()).offset) {
                throw new RuntimeException("Given stop offset lower than snapshot end offset");
            }
            for (FileLogInputStream.FileChannelRecordBatch batch : snapshotRecords.batches()) {
                this.batchHelper(batch, listener);
            }
        }
        long end = this.metadataLog.endOffset().offset;
        while (processedOffset < end) {
            FileRecords current = (FileRecords)this.metadataLog.read((long)processedOffset, (Isolation)Isolation.UNCOMMITTED).records;
            for (FileLogInputStream.FileChannelRecordBatch batch : current.batches()) {
                this.batchHelper(batch, listener);
                processedOffset = batch.lastOffset();
                if (!this.stopOffset.isPresent() || processedOffset < this.stopOffset.getAsLong()) continue;
                processedOffset = end;
                break;
            }
            ++processedOffset;
        }
        this.future.complete(null);
        listener.refreshNodes();
    }

    private void batchHelper(FileLogInputStream.FileChannelRecordBatch batch, RaftClient.Listener<ApiMessageAndVersion> listener) {
        if (batch.isControlBatch()) {
            this.handleControlBatch(batch, listener);
        } else {
            this.handleMetadataBatch(batch, listener);
        }
    }

    private void handleControlBatch(FileLogInputStream.FileChannelRecordBatch batch, RaftClient.Listener<ApiMessageAndVersion> listener) {
        for (Record record : batch) {
            try {
                short typeId = ControlRecordType.parseTypeId((ByteBuffer)record.key());
                ControlRecordType type = ControlRecordType.fromTypeId((short)typeId);
                if (type.equals((Object)ControlRecordType.LEADER_CHANGE)) {
                    LeaderChangeMessage message = new LeaderChangeMessage();
                    message.read((Readable)new ByteBufferAccessor(record.value()), (short)0);
                    listener.handleLeaderChange(new LeaderAndEpoch(OptionalInt.of(message.leaderId()), batch.partitionLeaderEpoch()));
                    continue;
                }
                log.error("Ignoring control record with type {} at offset {}", (Object)type, (Object)record.offset());
            }
            catch (Throwable e) {
                log.error("unable to read control record at offset {}", (Object)record.offset(), (Object)e);
            }
        }
    }

    private void handleMetadataBatch(FileLogInputStream.FileChannelRecordBatch batch, RaftClient.Listener<ApiMessageAndVersion> listener) {
        ArrayList<ApiMessageAndVersion> messages = new ArrayList<ApiMessageAndVersion>();
        for (Record record : batch) {
            ByteBufferAccessor accessor = new ByteBufferAccessor(record.value());
            try {
                ApiMessageAndVersion messageAndVersion = this.serde.read((Readable)accessor, record.valueSize());
                messages.add(messageAndVersion);
            }
            catch (Throwable e) {
                log.error("unable to read metadata record at offset {}", (Object)record.offset(), (Object)e);
            }
        }
        listener.handleCommit((BatchReader)MemoryBatchReader.of(Collections.singletonList(Batch.data((long)batch.baseOffset(), (int)batch.partitionLeaderEpoch(), (long)batch.maxTimestamp(), (int)batch.sizeInBytes(), messages)), __ -> {}));
    }

    @Override
    public CompletableFuture<Void> caughtUpFuture() {
        return this.future;
    }

    @Override
    public void close() throws Exception {
        this.metadataLog.close();
    }
}

