/*
 * Decompiled with CFR 0.152.
 */
package kafka.tier.domain;

import com.google.flatbuffers.FlatBufferBuilder;
import io.confluent.kafka.storage.tier.serdes.OffsetAndEpoch;
import io.confluent.kafka.storage.tier.serdes.OffsetRange;
import io.confluent.kafka.storage.tier.serdes.TierTopicSnapshot;
import io.confluent.kafka.storage.tier.serdes.TierTopicSnapshotEntry;
import io.confluent.kafka.storage.tier.serdes.TierTopicSnapshotHeader;
import java.io.IOException;
import java.io.InputStream;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import java.util.TreeMap;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.common.header.Headers;
import org.apache.kafka.common.header.internals.RecordHeaders;
import org.apache.kafka.common.record.TimestampType;
import org.apache.kafka.common.utils.Utils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class TierTopicPartitionSnapshot {
    private static final Logger log = LoggerFactory.getLogger(TierTopicPartitionSnapshot.class);
    private static final byte VERSION_0 = 0;
    private static final byte CURRENT_VERSION = 0;
    private static final int INITIAL_BUFFER_SIZE = 500;
    private static final int ESTIMATED_SNAPSHOT_SIZE = 8192;
    private static final int UNKNOWN_LEADER_EPOCH = -1;
    private static final String TIER_TOPIC_NAME = "_confluent-tier-state";
    private static final TimestampType TIMESTAMP_TYPE = TimestampType.CREATE_TIME;
    private final TierTopicSnapshot tierTopicSnapshot;
    private final Long startTimestampMs;
    private final Long endTimestampMs;

    TierTopicPartitionSnapshot(TierTopicSnapshot snapshot, Long startTimestampMs, Long endTimestampMs) {
        this.tierTopicSnapshot = snapshot;
        this.startTimestampMs = startTimestampMs;
        this.endTimestampMs = endTimestampMs;
    }

    public TierTopicPartitionSnapshot(List<ConsumerRecords<byte[], byte[]>> recordsBuffer, List<Long> nextOffsets) {
        TreeMap<Integer, Long> startOffsetMap = new TreeMap<Integer, Long>();
        TreeMap<Integer, Long> endOffsetMap = new TreeMap<Integer, Long>();
        long minTimestampMs = Long.MAX_VALUE;
        long maxTimestampMs = Long.MIN_VALUE;
        FlatBufferBuilder builder = new FlatBufferBuilder(500).forceDefaults(false);
        int[] entryOffsets = new int[recordsBuffer.stream().mapToInt(ConsumerRecords::count).sum()];
        int idx = 0;
        for (ConsumerRecords<byte[], byte[]> records : recordsBuffer) {
            for (ConsumerRecord record : records) {
                int partition = record.partition();
                minTimestampMs = Math.min(record.timestamp(), minTimestampMs);
                maxTimestampMs = Math.max(record.timestamp(), maxTimestampMs);
                long eventOffset = record.offset();
                startOffsetMap.putIfAbsent(partition, eventOffset);
                endOffsetMap.put(partition, eventOffset);
                int keyOffset = TierTopicSnapshotEntry.createKeyVector((FlatBufferBuilder)builder, (byte[])((byte[])record.key()));
                int valueOffset = TierTopicSnapshotEntry.createValueVector((FlatBufferBuilder)builder, (byte[])((byte[])record.value()));
                TierTopicSnapshotEntry.startTierTopicSnapshotEntry((FlatBufferBuilder)builder);
                TierTopicSnapshotEntry.addPartition((FlatBufferBuilder)builder, (byte)((byte)partition));
                int offsetAndEpochOffset = OffsetAndEpoch.createOffsetAndEpoch((FlatBufferBuilder)builder, (long)eventOffset, (int)record.leaderEpoch().orElse(-1));
                TierTopicSnapshotEntry.addOffsetAndEpoch((FlatBufferBuilder)builder, (int)offsetAndEpochOffset);
                TierTopicSnapshotEntry.addTimestamp((FlatBufferBuilder)builder, (long)record.timestamp());
                TierTopicSnapshotEntry.addKey((FlatBufferBuilder)builder, (int)keyOffset);
                TierTopicSnapshotEntry.addValue((FlatBufferBuilder)builder, (int)valueOffset);
                int entryOffset = TierTopicSnapshotEntry.endTierTopicSnapshotEntry((FlatBufferBuilder)builder);
                TierTopicSnapshotEntry.finishTierTopicSnapshotEntryBuffer((FlatBufferBuilder)builder, (int)entryOffset);
                entryOffsets[idx++] = entryOffset;
            }
        }
        for (int i = 0; i < nextOffsets.size(); ++i) {
            startOffsetMap.putIfAbsent(i, nextOffsets.get(i));
            endOffsetMap.putIfAbsent(i, nextOffsets.get(i));
        }
        int entriesOffset = TierTopicSnapshot.createTierTopicSnapshotEntriesVector((FlatBufferBuilder)builder, (int[])entryOffsets);
        TierTopicSnapshotHeader.startOffsetsVector((FlatBufferBuilder)builder, (int)startOffsetMap.size());
        for (int i = startOffsetMap.size() - 1; i >= 0; --i) {
            OffsetRange.createOffsetRange((FlatBufferBuilder)builder, (long)((Long)startOffsetMap.get(i)), (long)((Long)endOffsetMap.get(i)));
        }
        int offsetsOffset = builder.endVector();
        int headerOffset = TierTopicSnapshotHeader.createTierTopicSnapshotHeader((FlatBufferBuilder)builder, (int)offsetsOffset);
        TierTopicSnapshot.startTierTopicSnapshot((FlatBufferBuilder)builder);
        TierTopicSnapshot.addVersion((FlatBufferBuilder)builder, (byte)0);
        TierTopicSnapshot.addTierTopicSnapshotHeader((FlatBufferBuilder)builder, (int)headerOffset);
        TierTopicSnapshot.addTierTopicSnapshotEntries((FlatBufferBuilder)builder, (int)entriesOffset);
        int entryId = TierTopicSnapshot.endTierTopicSnapshot((FlatBufferBuilder)builder);
        TierTopicSnapshot.finishTierTopicSnapshotBuffer((FlatBufferBuilder)builder, (int)entryId);
        this.tierTopicSnapshot = TierTopicSnapshot.getRootAsTierTopicSnapshot((ByteBuffer)builder.dataBuffer());
        this.startTimestampMs = minTimestampMs;
        this.endTimestampMs = maxTimestampMs;
    }

    public static TierTopicPartitionSnapshot read(FileChannel channel, Long startTimestampMs, Long endTimestampMs) throws IOException {
        ByteBuffer buf = ByteBuffer.allocate((int)channel.size());
        Utils.readFully((FileChannel)channel, (ByteBuffer)buf, (long)0L);
        buf.flip();
        return new TierTopicPartitionSnapshot(TierTopicSnapshot.getRootAsTierTopicSnapshot((ByteBuffer)buf), startTimestampMs, endTimestampMs);
    }

    public static TierTopicPartitionSnapshot read(InputStream stream, Long startTimestampMs, Long endTimestampMs) throws IOException {
        ByteBuffer buffer = ByteBuffer.wrap(Utils.readFullyToArray((InputStream)stream, (int)8192));
        log.debug("Tier topic snapshot buffer: " + buffer);
        return new TierTopicPartitionSnapshot(TierTopicSnapshot.getRootAsTierTopicSnapshot((ByteBuffer)buffer), startTimestampMs, endTimestampMs);
    }

    public ByteBuffer payloadBuffer() {
        return this.tierTopicSnapshot.getByteBuffer().duplicate();
    }

    public List<ConsumerRecord<byte[], byte[]>> entries() {
        ArrayList<ConsumerRecord<byte[], byte[]>> output = new ArrayList<ConsumerRecord<byte[], byte[]>>();
        TierTopicSnapshotEntry.Vector vector = this.tierTopicSnapshot.tierTopicSnapshotEntriesVector();
        int size = vector.length();
        for (int i = 0; i < size; ++i) {
            TierTopicSnapshotEntry entry = vector.get(i);
            byte[] key = new byte[entry.keyLength()];
            entry.keyAsByteBuffer().get(key);
            byte[] value = new byte[entry.valueLength()];
            entry.valueAsByteBuffer().get(value);
            output.add(TierTopicPartitionSnapshot.makeTierTopicRecord(entry.partition(), entry.offsetAndEpoch().offset(), entry.timestamp(), key, value, entry.offsetAndEpoch().epoch()));
        }
        return output;
    }

    public long startTimestampMs() {
        return this.startTimestampMs;
    }

    public long endTimestampMs() {
        return this.endTimestampMs;
    }

    public long startOffset(int partition) {
        if (partition >= this.tierTopicSnapshot.tierTopicSnapshotHeader().offsetsLength()) {
            throw new IndexOutOfBoundsException("Invalid partition number " + partition);
        }
        return this.tierTopicSnapshot.tierTopicSnapshotHeader().offsets(partition).start();
    }

    public Long endOffset(int partition) {
        if (partition >= this.tierTopicSnapshot.tierTopicSnapshotHeader().offsetsLength()) {
            throw new IndexOutOfBoundsException("Invalid partition number " + partition);
        }
        return this.tierTopicSnapshot.tierTopicSnapshotHeader().offsets(partition).end();
    }

    public static ConsumerRecord<byte[], byte[]> makeTierTopicRecord(int partition, long offset, long timestamp, byte[] key, byte[] value, int leaderEpoch) {
        RecordHeaders headers = new RecordHeaders();
        return new ConsumerRecord(TIER_TOPIC_NAME, partition, offset, timestamp, TIMESTAMP_TYPE, key.length, value.length, (Object)key, (Object)value, (Headers)headers, leaderEpoch == -1 ? Optional.empty() : Optional.of(leaderEpoch));
    }

    public String toString() {
        return "TierTopicPartitionSnapshot{tierTopicSnapshot=" + this.tierTopicSnapshot + ", startTimestampMs=" + this.startTimestampMs + ", endTimestampMs=" + this.endTimestampMs + '}';
    }
}

