/*
 * Decompiled with CFR 0.152.
 */
package kafka.tier.fetcher;

import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Properties;
import java.util.UUID;
import java.util.concurrent.Executor;
import java.util.stream.IntStream;
import kafka.log.AbortedTxn;
import kafka.log.Log;
import kafka.log.LogConfig;
import kafka.log.LogSegment;
import kafka.tier.TopicIdPartition;
import kafka.tier.domain.TierObjectMetadata;
import kafka.tier.fetcher.CancellationContext;
import kafka.tier.fetcher.PendingFetch;
import kafka.tier.fetcher.TierAbortedTxnReader;
import kafka.tier.fetcher.TierFetchResult;
import kafka.tier.fetcher.offsetcache.FetchOffsetCache;
import kafka.tier.store.MockInMemoryTierObjectStore;
import kafka.tier.store.TierObjectStore;
import kafka.tier.store.TierObjectStoreConfig;
import org.apache.kafka.common.IsolationLevel;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.record.CompressionType;
import org.apache.kafka.common.record.MemoryRecords;
import org.apache.kafka.common.record.MemoryRecordsBuilder;
import org.apache.kafka.common.record.Record;
import org.apache.kafka.common.record.RecordBatch;
import org.apache.kafka.common.record.TimestampType;
import org.apache.kafka.common.utils.ByteBufferInputStream;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.test.TestUtils;
import org.junit.Assert;
import org.junit.Test;
import scala.Option;
import scala.collection.JavaConversions;
import scala.collection.Seq;
import scala.collection.immutable.HashSet;
import scala.collection.immutable.Set;
import scala.compat.java8.OptionConverters;

public class SegmentFileFetchRequestTest {
    private MockTime mockTime = new MockTime();
    private Executor currentThreadExecutor = Runnable::run;
    private long baseTimestamp = 1500000000000L;

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void targetOffsetTest() {
        CancellationContext ctx = CancellationContext.newContext();
        MockInMemoryTierObjectStore tierObjectStore = new MockInMemoryTierObjectStore(new TierObjectStoreConfig("cluster", Integer.valueOf(1)));
        TopicIdPartition topicIdPartition = new TopicIdPartition("foo", UUID.randomUUID(), 0);
        TopicPartition topicPartition = topicIdPartition.topicPartition();
        LogSegment segment = this.createSegment(0L, 3, 50);
        try {
            TierObjectStore.ObjectMetadata metadata = new TierObjectStore.ObjectMetadata(this.segmentMetadata(topicIdPartition, segment, false));
            this.putSegment((TierObjectStore)tierObjectStore, segment, metadata, Optional.empty());
            long targetOffset = 149L;
            PendingFetch pendingFetch = new PendingFetch(ctx, (TierObjectStore)tierObjectStore, new FetchOffsetCache(Time.SYSTEM, 10, 1000), Optional.empty(), metadata, key -> {}, targetOffset, 1024, segment.size(), IsolationLevel.READ_UNCOMMITTED, Collections.emptyList());
            this.currentThreadExecutor.execute((Runnable)pendingFetch);
            TierFetchResult result = (TierFetchResult)pendingFetch.finish().get(topicPartition);
            Assert.assertTrue((String)"Records should be complete", (boolean)result.records.batches().iterator().hasNext());
            Assert.assertNotEquals((String)"Should return records", (Object)result.records, (Object)MemoryRecords.EMPTY);
            RecordBatch firstRecordBatch = (RecordBatch)result.records.batches().iterator().next();
            Assert.assertTrue((String)"Results should include target offset in the first record batch", (firstRecordBatch.baseOffset() <= targetOffset && firstRecordBatch.lastOffset() >= targetOffset ? 1 : 0) != 0);
        }
        catch (IOException e) {
            e.printStackTrace();
            Assert.fail((String)"Unexpected exception");
        }
        finally {
            ctx.close();
            segment.close();
            tierObjectStore.close();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void targetOffsetOutOfRangeTest() {
        CancellationContext ctx = CancellationContext.newContext();
        MockInMemoryTierObjectStore tierObjectStore = new MockInMemoryTierObjectStore(new TierObjectStoreConfig("cluster", Integer.valueOf(1)));
        TopicIdPartition topicIdPartition = new TopicIdPartition("foo", UUID.randomUUID(), 0);
        TopicPartition topicPartition = topicIdPartition.topicPartition();
        LogSegment segment = this.createSegment(0L, 3, 50);
        try {
            TierObjectStore.ObjectMetadata metadata = new TierObjectStore.ObjectMetadata(this.segmentMetadata(topicIdPartition, segment, false));
            this.putSegment((TierObjectStore)tierObjectStore, segment, metadata, Optional.empty());
            Long targetOffset = 150L;
            PendingFetch pendingFetch = new PendingFetch(ctx, (TierObjectStore)tierObjectStore, new FetchOffsetCache(Time.SYSTEM, 10, 1000), Optional.empty(), metadata, key -> {}, targetOffset.longValue(), 1024, segment.size(), IsolationLevel.READ_UNCOMMITTED, Collections.emptyList());
            this.currentThreadExecutor.execute((Runnable)pendingFetch);
            TierFetchResult result = (TierFetchResult)pendingFetch.finish().get(topicPartition);
            Assert.assertFalse((String)"Records should be incomplete", (boolean)result.records.batches().iterator().hasNext());
            Assert.assertEquals((String)"Should return empty records", (Object)result.records, (Object)MemoryRecords.EMPTY);
        }
        catch (IOException e) {
            e.printStackTrace();
            Assert.fail((String)"Unexpected exception");
        }
        finally {
            ctx.close();
            segment.close();
            tierObjectStore.close();
        }
    }

    private TierObjectMetadata segmentMetadata(TopicIdPartition topicIdPartition, LogSegment logSegment, boolean hasAbortedTxns) {
        return new TierObjectMetadata(topicIdPartition, 0, UUID.randomUUID(), logSegment.baseOffset(), logSegment.readNextOffset() - 1L, logSegment.largestTimestamp(), logSegment.size(), TierObjectMetadata.State.SEGMENT_UPLOAD_INITIATE, false, hasAbortedTxns, true);
    }

    private MemoryRecords createRecords(long offset, int n) {
        ByteBuffer buffer = ByteBuffer.allocate(2048);
        MemoryRecordsBuilder builder = MemoryRecords.builder((ByteBuffer)buffer, (byte)2, (CompressionType)CompressionType.NONE, (TimestampType)TimestampType.CREATE_TIME, (long)offset, (long)0L);
        IntStream.range(0, n).forEach(i -> builder.appendWithOffset(offset + (long)i, this.baseTimestamp + offset, "a".getBytes(), "v".getBytes()));
        return builder.build();
    }

    @Test
    public void testSerializingAbortedTransactions() {
        ByteBuffer buf = this.serializeAbortedTxns(new AbortedTxn(0L, 0L, 5L, 0L), new AbortedTxn(0L, 10L, 20L, 0L), new AbortedTxn(0L, 50L, 100L, 0L)).get();
        List read = TierAbortedTxnReader.readInto((CancellationContext)CancellationContext.newContext(), (InputStream)new ByteBufferInputStream(buf), (long)0L, (long)100L);
        Assert.assertEquals(Arrays.asList(new AbortedTxn(0L, 0L, 5L, 0L), new AbortedTxn(0L, 10L, 20L, 0L), new AbortedTxn(0L, 50L, 100L, 0L)), (Object)read);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testReadCommittedEmptyBatch() {
        CancellationContext ctx = CancellationContext.newContext();
        MockInMemoryTierObjectStore tierObjectStore = new MockInMemoryTierObjectStore(new TierObjectStoreConfig("cluster", Integer.valueOf(1)));
        TopicIdPartition topicIdPartition = new TopicIdPartition("foo", UUID.randomUUID(), 0);
        TopicPartition topicPartition = topicIdPartition.topicPartition();
        LogSegment segment = this.createSegment(0L, 0, 0);
        try {
            Optional<ByteBuffer> serializedAbortedTxns = this.serializeAbortedTxns(new AbortedTxn(0L, 0L, 5L, 0L), new AbortedTxn(0L, 10L, 20L, 0L), new AbortedTxn(0L, 50L, 100L, 0L), new AbortedTxn(0L, 101L, 150L, 0L));
            TierObjectStore.ObjectMetadata metadata = new TierObjectStore.ObjectMetadata(this.segmentMetadata(topicIdPartition, segment, serializedAbortedTxns.isPresent()));
            this.putSegment((TierObjectStore)tierObjectStore, segment, metadata, serializedAbortedTxns);
            long targetOffset = 0L;
            PendingFetch pendingFetch = new PendingFetch(ctx, (TierObjectStore)tierObjectStore, new FetchOffsetCache(Time.SYSTEM, 10, 1000), Optional.empty(), metadata, key -> {}, targetOffset, 1024, segment.size(), IsolationLevel.READ_COMMITTED, Collections.emptyList());
            this.currentThreadExecutor.execute((Runnable)pendingFetch);
            TierFetchResult result = (TierFetchResult)pendingFetch.finish().get(topicPartition);
            Assert.assertFalse((String)"expected to find 0 records", (boolean)result.records.records().iterator().hasNext());
            Assert.assertEquals((String)"expected to find 0 aborted transactions overlapping the fetched range", Collections.emptyList(), (Object)result.abortedTxns);
        }
        catch (IOException e) {
            e.printStackTrace();
            Assert.fail((String)"Unexpected exception");
        }
        finally {
            ctx.close();
            segment.close();
            tierObjectStore.close();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testFetchingReadCommitted() {
        CancellationContext ctx = CancellationContext.newContext();
        MockInMemoryTierObjectStore tierObjectStore = new MockInMemoryTierObjectStore(new TierObjectStoreConfig("cluster", Integer.valueOf(1)));
        TopicIdPartition topicIdPartition = new TopicIdPartition("foo", UUID.randomUUID(), 0);
        TopicPartition topicPartition = topicIdPartition.topicPartition();
        LogSegment segment = this.createSegment(0L, 3, 50);
        try {
            Optional<ByteBuffer> serializedAbortedTxns = this.serializeAbortedTxns(new AbortedTxn(0L, 0L, 5L, 0L), new AbortedTxn(0L, 10L, 20L, 0L), new AbortedTxn(0L, 50L, 100L, 0L), new AbortedTxn(0L, 101L, 150L, 0L));
            TierObjectStore.ObjectMetadata metadata = new TierObjectStore.ObjectMetadata(this.segmentMetadata(topicIdPartition, segment, serializedAbortedTxns.isPresent()));
            this.putSegment((TierObjectStore)tierObjectStore, segment, metadata, serializedAbortedTxns);
            long targetOffset = 0L;
            PendingFetch pendingFetch = new PendingFetch(ctx, (TierObjectStore)tierObjectStore, new FetchOffsetCache(Time.SYSTEM, 10, 1000), Optional.empty(), metadata, key -> {}, targetOffset, 1024, segment.size(), IsolationLevel.READ_COMMITTED, Collections.emptyList());
            this.currentThreadExecutor.execute((Runnable)pendingFetch);
            TierFetchResult result = (TierFetchResult)pendingFetch.finish().get(topicPartition);
            int recordCount = 0;
            for (Record record : result.records.records()) {
                Assert.assertTrue((String)"Expected all records to be valid", (boolean)record.isValid());
                ++recordCount;
            }
            Assert.assertEquals((String)"expected to find 100 records", (long)100L, (long)recordCount);
            Assert.assertEquals((String)"expected to find the 3 aborted transactions overlapping the fetch range", Arrays.asList(new AbortedTxn(0L, 0L, 5L, 0L), new AbortedTxn(0L, 10L, 20L, 0L), new AbortedTxn(0L, 50L, 100L, 0L)), (Object)result.abortedTxns);
        }
        catch (IOException e) {
            e.printStackTrace();
            Assert.fail((String)"Unexpected exception");
        }
        finally {
            ctx.close();
            segment.close();
            tierObjectStore.close();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testFetchingReadCommittedException() {
        CancellationContext ctx = CancellationContext.newContext();
        MockInMemoryTierObjectStore tierObjectStore = new MockInMemoryTierObjectStore(new TierObjectStoreConfig("cluster", Integer.valueOf(1)));
        TopicIdPartition topicIdPartition = new TopicIdPartition("foo", UUID.randomUUID(), 0);
        TopicPartition topicPartition = topicIdPartition.topicPartition();
        LogSegment segment = this.createSegment(0L, 3, 50);
        try {
            Optional<ByteBuffer> serializedAbortedTxns = this.serializeAbortedTxns(new AbortedTxn(0L, 0L, 5L, 0L), new AbortedTxn(0L, 10L, 20L, 0L), new AbortedTxn(0L, 50L, 100L, 0L), new AbortedTxn(0L, 101L, 150L, 0L));
            TierObjectStore.ObjectMetadata metadata = new TierObjectStore.ObjectMetadata(this.segmentMetadata(topicIdPartition, segment, serializedAbortedTxns.isPresent()));
            this.putSegment((TierObjectStore)tierObjectStore, segment, metadata, serializedAbortedTxns);
            tierObjectStore.throwExceptionOnTransactionFetch = true;
            PendingFetch pendingFetch = new PendingFetch(ctx, (TierObjectStore)tierObjectStore, new FetchOffsetCache(Time.SYSTEM, 10, 1000), Optional.empty(), metadata, key -> {}, 0L, 1024, segment.size(), IsolationLevel.READ_COMMITTED, Collections.emptyList());
            this.currentThreadExecutor.execute((Runnable)pendingFetch);
            TierFetchResult result = (TierFetchResult)pendingFetch.finish().get(topicPartition);
            Assert.assertFalse((String)"Expected to find 0 records, because an exception was thrown", (boolean)result.records.records().iterator().hasNext());
            Assert.assertTrue((boolean)(result.exception instanceof IOException));
            Assert.assertEquals((String)"Expected to find 0 aborted transaction because an exception was thrown", Collections.emptyList(), (Object)result.abortedTxns);
            tierObjectStore.throwExceptionOnTransactionFetch = false;
            tierObjectStore.throwExceptionOnSegmentFetch = true;
            pendingFetch = new PendingFetch(ctx, (TierObjectStore)tierObjectStore, new FetchOffsetCache(Time.SYSTEM, 10, 1000), Optional.empty(), metadata, key -> {}, 0L, 1024, segment.size(), IsolationLevel.READ_COMMITTED, Collections.emptyList());
            this.currentThreadExecutor.execute((Runnable)pendingFetch);
            result = (TierFetchResult)pendingFetch.finish().get(topicPartition);
            Assert.assertFalse((String)"Expected to find 0 records, because an exception was thrown", (boolean)result.records.records().iterator().hasNext());
            Assert.assertTrue((boolean)(result.exception instanceof IOException));
            Assert.assertEquals((String)"Expected to find 0 aborted transaction because an exception was thrown", Collections.emptyList(), (Object)result.abortedTxns);
        }
        catch (IOException e) {
            e.printStackTrace();
            Assert.fail((String)"Unexpected exception");
        }
        finally {
            ctx.close();
            segment.close();
            tierObjectStore.close();
        }
    }

    Optional<ByteBuffer> serializeAbortedTxns(AbortedTxn ... abortedTxns) {
        return OptionConverters.toJava((Option)Log.serializeAbortedTransactions((Seq)JavaConversions.asScalaBuffer(Arrays.asList(abortedTxns))));
    }

    private LogSegment createSegment(long baseOffset, int batches, int recsPerBatch) {
        File logSegmentDir = TestUtils.tempDirectory();
        logSegmentDir.deleteOnExit();
        Properties logProps = new Properties();
        logProps.put(LogConfig.IndexIntervalBytesProp(), (Object)1);
        LogConfig logConfig = LogConfig.apply((Map)logProps, (Set)new HashSet());
        LogSegment segment = LogSegment.open((File)logSegmentDir, (long)baseOffset, (LogConfig)logConfig, (Time)this.mockTime, (boolean)false, (int)4096, (boolean)false, (String)"");
        IntStream.range(0, batches).forEach(i -> {
            long nextOffset = segment.readNextOffset();
            MemoryRecords recs = this.createRecords(nextOffset, recsPerBatch);
            long largestOffset = nextOffset + (long)recsPerBatch - 1L;
            segment.append(largestOffset, this.baseTimestamp + largestOffset, (long)(recsPerBatch - 1), recs);
            segment.flush();
        });
        segment.offsetIndex().flush();
        segment.offsetIndex().trimToValidSize();
        return segment;
    }

    private void putSegment(TierObjectStore tierObjectStore, LogSegment segment, TierObjectStore.ObjectMetadata metadata, Optional<ByteBuffer> abortedTxns) throws IOException {
        tierObjectStore.putSegment(metadata, segment.log().file(), segment.offsetIndex().file(), segment.timeIndex().file(), Optional.empty(), abortedTxns, Optional.of(segment.timeIndex().file()));
    }
}

