/*
 * Decompiled with CFR 0.152.
 */
package kafka.tier.fetcher;

import io.confluent.kafka.storage.checksum.ChecksumParams;
import java.io.EOFException;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Optional;
import java.util.TreeMap;
import java.util.TreeSet;
import java.util.stream.Collectors;
import kafka.tier.fetcher.CancellationContext;
import kafka.tier.fetcher.ReclaimableMemoryRecords;
import kafka.tier.fetcher.TierSegmentReader;
import kafka.tier.fetcher.TierTimestampIndexIterator;
import kafka.utils.TestUtils;
import org.apache.kafka.common.record.CompressionType;
import org.apache.kafka.common.record.DefaultRecordBatch;
import org.apache.kafka.common.record.MemoryRecords;
import org.apache.kafka.common.record.MemoryRecordsBuilder;
import org.apache.kafka.common.record.MutableRecordBatch;
import org.apache.kafka.common.record.Record;
import org.apache.kafka.common.record.RecordBatch;
import org.apache.kafka.common.record.SimpleRecord;
import org.apache.kafka.common.record.TimestampType;
import org.apache.kafka.common.utils.ByteBufferInputStream;
import org.apache.kafka.storage.internals.log.TimeIndex;
import org.apache.kafka.storage.internals.log.TimestampOffset;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;

public class TierSegmentReaderTest {
    private final TierSegmentReader reader = new TierSegmentReader("");

    @Test
    public void homogenousRecordBatchTest() throws IOException {
        SimpleRecord[] simpleRecords = new SimpleRecord[]{new SimpleRecord(1L, "foo".getBytes(), "1".getBytes()), new SimpleRecord(2L, "b".getBytes(), "2".getBytes()), new SimpleRecord(3L, "c".getBytes(), "3".getBytes())};
        ByteBuffer records = MemoryRecords.withRecords((byte)2, (long)0L, (CompressionType)CompressionType.NONE, (TimestampType)TimestampType.CREATE_TIME, (SimpleRecord[])simpleRecords).buffer();
        ByteBuffer records2 = MemoryRecords.withRecords((byte)2, (long)3L, (CompressionType)CompressionType.NONE, (TimestampType)TimestampType.CREATE_TIME, (SimpleRecord[])simpleRecords).buffer();
        ByteBuffer combinedBuffer = ByteBuffer.allocate(records.limit() + records2.limit());
        combinedBuffer.put(records);
        combinedBuffer.put(records2);
        combinedBuffer.flip();
        this.testExpected(combinedBuffer, 0L, 0L, 5L);
        this.testExpected(combinedBuffer, 1L, 0L, 5L);
        this.testExpected(combinedBuffer, 2L, 0L, 5L);
        this.testExpected(combinedBuffer, 3L, 3L, 5L);
        this.testExpected(combinedBuffer, 4L, 3L, 5L);
        this.testExpected(combinedBuffer, 5L, 3L, 5L);
        TierSegmentReaderTest.testThrows(this.reader, combinedBuffer, 6L, EOFException.class);
        TierSegmentReaderTest.testThrows(this.reader, combinedBuffer, 7L, EOFException.class);
    }

    @Test
    public void testAbortOnEOF() {
        List<MemoryRecords> batches = this.createBatches();
        final int size = batches.stream().mapToInt(MemoryRecords::sizeInBytes).sum();
        try (final InputStream stream = this.toStream(batches);){
            InputStream faulty = new InputStream(){
                private final InputStream inner;
                private int bytesRead;
                {
                    this.inner = stream;
                    this.bytesRead = -1;
                }

                @Override
                public int read() throws IOException {
                    if (this.bytesRead >= size - 1) {
                        throw new IOException("hit eof!");
                    }
                    ++this.bytesRead;
                    return this.inner.read();
                }
            };
            CancellationContext ctx = CancellationContext.newContext();
            TierSegmentReader reader = new TierSegmentReader("");
            long targetOffset = 100L;
            TierSegmentReader.RecordsAndNextBatchMetadata recordsAndMetadata = reader.readRecords(ctx, Optional.empty(), faulty, 0x100000, 100L, 0, size * 2);
            long expectedOffset = 100L;
            for (RecordBatch batch : recordsAndMetadata.records.batches()) {
                for (Record record : batch) {
                    Assertions.assertEquals((long)expectedOffset, (long)record.offset(), (String)"expected to find target offset 100 and all offsets after to be linearly increasing.");
                    ++expectedOffset;
                }
            }
        }
        catch (IOException e) {
            Assertions.fail((String)"expected no exception to be thrown");
        }
    }

    @Test
    public void testReadRecordsMissingRecordsBetweenBatches() throws IOException {
        SimpleRecord[] simpleRecords = new SimpleRecord[]{new SimpleRecord(1L, "foo".getBytes(), "1".getBytes()), new SimpleRecord(2L, "b".getBytes(), "2".getBytes()), new SimpleRecord(3L, "c".getBytes(), "3".getBytes())};
        ByteBuffer records1 = MemoryRecords.withRecords((byte)2, (long)3L, (CompressionType)CompressionType.NONE, (TimestampType)TimestampType.CREATE_TIME, (SimpleRecord[])simpleRecords).buffer();
        ByteBuffer records2 = MemoryRecords.withRecords((byte)2, (long)6L, (CompressionType)CompressionType.NONE, (TimestampType)TimestampType.CREATE_TIME, (SimpleRecord[])simpleRecords).buffer();
        ByteBuffer records3 = MemoryRecords.withRecords((byte)2, (long)12L, (CompressionType)CompressionType.NONE, (TimestampType)TimestampType.CREATE_TIME, (SimpleRecord[])simpleRecords).buffer();
        ByteBuffer combinedBuffer = ByteBuffer.allocate(records1.limit() + records2.limit() + records3.limit());
        combinedBuffer.put(records1);
        combinedBuffer.put(records2);
        combinedBuffer.put(records3);
        combinedBuffer.flip();
        this.testExpected(combinedBuffer, 0L, 3L, 14L);
        this.testExpected(combinedBuffer, 1L, 3L, 14L);
        this.testExpected(combinedBuffer, 2L, 3L, 14L);
        this.testExpected(combinedBuffer, 3L, 3L, 14L);
        this.testExpected(combinedBuffer, 4L, 3L, 14L);
        this.testExpected(combinedBuffer, 5L, 3L, 14L);
        this.testExpected(combinedBuffer, 6L, 6L, 14L);
        this.testExpected(combinedBuffer, 7L, 6L, 14L);
        this.testExpected(combinedBuffer, 8L, 6L, 14L);
        this.testExpected(combinedBuffer, 9L, 12L, 14L);
        this.testExpected(combinedBuffer, 10L, 12L, 14L);
        this.testExpected(combinedBuffer, 11L, 12L, 14L);
        this.testExpected(combinedBuffer, 12L, 12L, 14L);
        this.testExpected(combinedBuffer, 13L, 12L, 14L);
        this.testExpected(combinedBuffer, 14L, 12L, 14L);
        TierSegmentReaderTest.testThrows(this.reader, combinedBuffer, 15L, EOFException.class);
    }

    @Test
    public void testReadRecordsMissingRecordsWithinBatch() throws IOException {
        SimpleRecord[] simpleRecords = new SimpleRecord[]{new SimpleRecord(1L, "foo".getBytes(), "1".getBytes()), new SimpleRecord(2L, "b".getBytes(), "2".getBytes()), new SimpleRecord(3L, "c".getBytes(), "3".getBytes())};
        ByteBuffer buffer = ByteBuffer.allocate(simpleRecords.length);
        MemoryRecordsBuilder builder = MemoryRecords.builder((ByteBuffer)buffer, (byte)2, (CompressionType)CompressionType.NONE, (TimestampType)TimestampType.LOG_APPEND_TIME, (long)1L, (long)System.currentTimeMillis(), (int)0);
        builder.appendWithOffset(3L, simpleRecords[0]);
        builder.appendWithOffset(5L, simpleRecords[1]);
        ByteBuffer records = builder.build().buffer();
        ByteBuffer combinedBuffer = ByteBuffer.allocate(records.limit());
        combinedBuffer.put(records);
        combinedBuffer.flip();
        this.testExpected(combinedBuffer, 0L, 1L, 5L);
        this.testExpected(combinedBuffer, 1L, 1L, 5L);
        this.testExpected(combinedBuffer, 2L, 1L, 5L);
        this.testExpected(combinedBuffer, 3L, 1L, 5L);
        this.testExpected(combinedBuffer, 5L, 1L, 5L);
        this.testExpected(combinedBuffer, 4L, 1L, 5L);
        TierSegmentReaderTest.testThrows(this.reader, combinedBuffer, 6L, EOFException.class);
    }

    @Test
    public void testReadRecordsOneBatchAlignedBoundaries() throws IOException {
        List<MemoryRecords> batches = this.createBatches();
        int batchSize = batches.get(0).sizeInBytes();
        this.testReadSegment(batches, batchSize);
        this.testReadSegment(batches, batchSize - 1);
        this.testReadSegment(batches, batchSize * 2);
    }

    @Test
    public void testReadRecordsOneBatchUnalignedBoundaries() throws IOException {
        List<MemoryRecords> batches = this.createBatches();
        int batchSize = batches.get(0).sizeInBytes();
        this.testReadSegment(batches, batchSize + 1);
    }

    @Test
    public void testReadRecordsOneBatchMaxLessThanBatchSize() throws IOException {
        List<MemoryRecords> batches = this.createBatches();
        int batchSize = batches.get(0).sizeInBytes();
        this.testReadSegment(batches, batchSize - 2);
    }

    @Test
    public void testReadRecordsMultipleBatchesAligned() throws IOException {
        List<MemoryRecords> batches = this.createBatches();
        int batchSize = batches.get(0).sizeInBytes();
        this.testReadSegment(batches, batchSize * 3);
    }

    @Test
    public void testReadRecordsMultipleBatchesUnaligned1() throws IOException {
        List<MemoryRecords> batches = this.createBatches();
        int batchSize = batches.get(0).sizeInBytes();
        this.testReadSegment(batches, batchSize * 3 + 2);
    }

    @Test
    public void testReadRecordsMultipleBatchesUnaligned2() throws IOException {
        List<MemoryRecords> batches = this.createBatches();
        int batchSize = batches.get(0).sizeInBytes();
        this.testReadSegment(batches, batchSize * 3 - 2);
    }

    @Test
    public void offsetForTimestampTest() {
        ByteBuffer records = MemoryRecords.withRecords((byte)2, (long)0L, (CompressionType)CompressionType.NONE, (TimestampType)TimestampType.CREATE_TIME, (SimpleRecord[])new SimpleRecord[]{new SimpleRecord(1L, "foo".getBytes(), "1".getBytes()), new SimpleRecord(2L, "b".getBytes(), "2".getBytes()), new SimpleRecord(3L, "c".getBytes(), "3".getBytes())}).buffer();
        ByteBuffer records2 = MemoryRecords.withRecords((byte)2, (long)3L, (CompressionType)CompressionType.NONE, (TimestampType)TimestampType.CREATE_TIME, (SimpleRecord[])new SimpleRecord[]{new SimpleRecord(2L, "foo".getBytes(), "1".getBytes()), new SimpleRecord(5L, "b".getBytes(), "2".getBytes()), new SimpleRecord(6L, "c".getBytes(), "3".getBytes())}).buffer();
        ByteBuffer combinedBuffer = ByteBuffer.allocate(records.limit() + records2.limit());
        combinedBuffer.put(records);
        combinedBuffer.put(records2);
        this.assertCorrectOffsetForTimestamp(combinedBuffer, 1L, Optional.of(0L));
        this.assertCorrectOffsetForTimestamp(combinedBuffer, 2L, Optional.of(1L));
        this.assertCorrectOffsetForTimestamp(combinedBuffer, 3L, Optional.of(2L));
        this.assertCorrectOffsetForTimestamp(combinedBuffer, 5L, Optional.of(4L));
        this.assertCorrectOffsetForTimestamp(combinedBuffer, 6L, Optional.of(5L));
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void timestampIndexIteratorTest() {
        try {
            File file = File.createTempFile("kafka", ".tmp");
            try {
                try {
                    long baseOffset = 1000L;
                    ArrayList<TimestampOffset> expected = new ArrayList<TimestampOffset>();
                    TimeIndex timeIndex = new TimeIndex(file, baseOffset, 800, false, true, new ChecksumParams(Optional.of(TestUtils.createChecksumStore()), true, true));
                    timeIndex.resize(800);
                    timeIndex.flush();
                    expected.add(new TimestampOffset(20000L, 14000L));
                    expected.add(new TimestampOffset(30000L, 18000L));
                    expected.add(new TimestampOffset(40000L, 20000L));
                    expected.add(new TimestampOffset(40001L, 20001L));
                    for (TimestampOffset tso : expected) {
                        timeIndex.maybeAppend(tso.timestamp, tso.offset, false);
                    }
                    timeIndex.flush();
                    timeIndex.close();
                    ArrayList<Object> actual = new ArrayList<Object>();
                    TierTimestampIndexIterator iterator = new TierTimestampIndexIterator((InputStream)new FileInputStream(file), baseOffset);
                    while (iterator.hasNext()) {
                        actual.add(iterator.next());
                    }
                    Assertions.assertEquals(expected, actual);
                }
                catch (IOException ioe) {
                    Assertions.fail((String)ioe.getMessage());
                }
            }
            finally {
                file.delete();
            }
        }
        catch (IOException ioe) {
            Assertions.fail((String)ioe.getMessage());
        }
    }

    private MemoryRecords createRecords(List<SimpleRecord> records, long baseOffset, int leaderEpoch) {
        ByteBuffer buffer = ByteBuffer.allocate(DefaultRecordBatch.sizeInBytes(records));
        MemoryRecordsBuilder builder = MemoryRecords.builder((ByteBuffer)buffer, (byte)2, (CompressionType)CompressionType.NONE, (TimestampType)TimestampType.LOG_APPEND_TIME, (long)baseOffset, (long)System.currentTimeMillis(), (int)leaderEpoch);
        for (SimpleRecord record : records) {
            builder.append(record);
        }
        return builder.build();
    }

    private InputStream toStream(List<MemoryRecords> records) {
        int totalSize = records.stream().mapToInt(MemoryRecords::sizeInBytes).sum();
        ByteBuffer buffer = ByteBuffer.allocate(totalSize);
        for (MemoryRecords record : records) {
            buffer.put(record.buffer());
        }
        buffer.flip();
        return new ByteBufferInputStream(buffer);
    }

    private List<MemoryRecords> createBatches() {
        ArrayList<SimpleRecord> records = new ArrayList<SimpleRecord>();
        records.add(new SimpleRecord(1L, "k1".getBytes(), "v1".getBytes()));
        records.add(new SimpleRecord(2L, "k2".getBytes(), "v2".getBytes()));
        records.add(new SimpleRecord(3L, "k3".getBytes(), "v3".getBytes()));
        ArrayList<MemoryRecords> batches = new ArrayList<MemoryRecords>();
        batches.add(this.createRecords(records, 100L, 0));
        batches.add(this.createRecords(records, 103L, 0));
        batches.add(this.createRecords(records, 106L, 0));
        batches.add(this.createRecords(records, 109L, 0));
        return batches;
    }

    private void testExpected(ByteBuffer combinedBuffer, Long target, Long expectedStart, Long expectedEnd) throws IOException {
        combinedBuffer.position(0);
        ByteBufferInputStream is = new ByteBufferInputStream(combinedBuffer);
        CancellationContext cancellationContext = CancellationContext.newContext();
        ReclaimableMemoryRecords records = this.reader.readRecords((CancellationContext)cancellationContext.subContext(), Optional.empty(), (InputStream)is, (int)1000, (long)target.longValue(), (int)0, (int)combinedBuffer.limit()).records;
        Long firstOffset = null;
        Long lastOffset = null;
        if (records.sizeInBytes() != 0) {
            for (MutableRecordBatch batch : records.batches()) {
                if (firstOffset == null) {
                    firstOffset = batch.baseOffset();
                }
                lastOffset = batch.lastOffset();
            }
        }
        Assertions.assertEquals((Long)expectedStart, firstOffset);
        Assertions.assertEquals((Long)expectedEnd, lastOffset);
    }

    private static <T extends Throwable> void testThrows(TierSegmentReader reader, ByteBuffer combinedBuffer, long target, Class<T> expectedThrowable) {
        combinedBuffer.position(0);
        ByteBufferInputStream is = new ByteBufferInputStream(combinedBuffer);
        CancellationContext cancellationContext = CancellationContext.newContext();
        Assertions.assertThrows(expectedThrowable, () -> reader.readRecords(cancellationContext.subContext(), Optional.empty(), (InputStream)is, 1000, target, 0, combinedBuffer.limit()));
    }

    private void testReadSegment(List<MemoryRecords> batches, int maxBytes) throws IOException {
        TreeMap<Long, BatchAndPosition> offsetToPosition = new TreeMap<Long, BatchAndPosition>();
        ArrayList<Long> targetOffsets = new ArrayList<Long>();
        CancellationContext ctx = CancellationContext.newContext();
        int batchSize = batches.get(0).firstBatchSize();
        int segmentSize = batchSize * batches.size();
        int position = 0;
        for (MemoryRecords batch : batches) {
            RecordBatch firstBatch = batch.firstBatch();
            offsetToPosition.put(firstBatch.baseOffset(), new BatchAndPosition(batch, position));
            targetOffsets.add(firstBatch.baseOffset());
            targetOffsets.add(firstBatch.baseOffset() + 1L);
            position += batch.sizeInBytes();
        }
        Iterator<Object> iterator = targetOffsets.iterator();
        while (iterator.hasNext()) {
            long targetOffset = (Long)iterator.next();
            long expectedFirstOffset = offsetToPosition.floorKey(targetOffset);
            BatchAndPosition expectedFirstBatchAndPosition = (BatchAndPosition)offsetToPosition.get(expectedFirstOffset);
            int expectedNumBatches = Math.max(1, maxBytes / expectedFirstBatchAndPosition.records.sizeInBytes());
            TreeSet expectedOffsets = offsetToPosition.navigableKeySet().tailSet(expectedFirstOffset).stream().limit(expectedNumBatches).collect(Collectors.toCollection(TreeSet::new));
            Long nextOffset = offsetToPosition.higherKey((Long)expectedOffsets.last());
            InputStream stream = this.toStream(batches);
            TierSegmentReader.RecordsAndNextBatchMetadata result = this.reader.readRecords(ctx, Optional.empty(), stream, maxBytes, targetOffset, 0, segmentSize);
            if (expectedOffsets.isEmpty()) {
                Assertions.assertNull((Object)result);
                continue;
            }
            Iterator expectedOffsetsIt = expectedOffsets.iterator();
            RecordBatch lastBatch = null;
            for (RecordBatch batch : result.records.batches()) {
                Assertions.assertEquals((long)((Long)expectedOffsetsIt.next()), (long)batch.baseOffset());
                lastBatch = batch;
            }
            if (nextOffset != null) {
                BatchAndPosition nextBatch = (BatchAndPosition)offsetToPosition.get(nextOffset);
                Assertions.assertEquals((long)lastBatch.nextOffset(), (long)result.nextOffsetAndBatchMetadata.nextOffset);
                Assertions.assertEquals((long)nextOffset, (long)result.nextOffsetAndBatchMetadata.nextOffset);
                Assertions.assertEquals((int)nextBatch.bytePosition, (int)result.nextOffsetAndBatchMetadata.nextBatchMetadata.bytePosition);
                if (!result.nextOffsetAndBatchMetadata.nextBatchMetadata.recordBatchSize.isPresent()) continue;
                Assertions.assertEquals((int)nextBatch.records.sizeInBytes(), (int)result.nextOffsetAndBatchMetadata.nextBatchMetadata.recordBatchSize.getAsInt());
                continue;
            }
            Assertions.assertNull((Object)result.nextOffsetAndBatchMetadata);
        }
    }

    private void assertCorrectOffsetForTimestamp(ByteBuffer combinedBuffer, long targetTimestamp, Optional<Long> expectedOffset) {
        combinedBuffer.position(0);
        ByteBufferInputStream is = new ByteBufferInputStream(combinedBuffer);
        CancellationContext cancellationContext = CancellationContext.newContext();
        try {
            Optional timestamp = this.reader.offsetForTimestamp(cancellationContext.subContext(), (InputStream)is, targetTimestamp, combinedBuffer.limit());
            Assertions.assertEquals(expectedOffset, (Object)timestamp);
        }
        catch (IOException ioe) {
            Assertions.fail((String)"IOexception encountered");
        }
    }

    private static class BatchAndPosition {
        MemoryRecords records;
        int bytePosition;

        BatchAndPosition(MemoryRecords records, int bytePosition) {
            this.records = records;
            this.bytePosition = bytePosition;
        }
    }
}

