/*
 * Decompiled with CFR 0.152.
 */
package kafka.tier.fetcher;

import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.nio.ByteBuffer;
import java.nio.file.Files;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
import java.util.Properties;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.IntStream;
import junit.framework.TestCase;
import kafka.log.LogConfig;
import kafka.log.LogSegment;
import kafka.server.DelayedOperation;
import kafka.tier.TierTimestampAndOffset;
import kafka.tier.TopicIdPartition;
import kafka.tier.fetcher.PendingFetch;
import kafka.tier.fetcher.PendingOffsetForTimestamp;
import kafka.tier.fetcher.TierFetchMetadata;
import kafka.tier.fetcher.TierFetchResult;
import kafka.tier.fetcher.TierFetcher;
import kafka.tier.store.TierObjectStore;
import kafka.tier.store.TierObjectStoreResponse;
import kafka.utils.KafkaScheduler;
import org.apache.kafka.common.IsolationLevel;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.record.CompressionType;
import org.apache.kafka.common.record.FileRecords;
import org.apache.kafka.common.record.MemoryRecords;
import org.apache.kafka.common.record.MemoryRecordsBuilder;
import org.apache.kafka.common.record.MutableRecordBatch;
import org.apache.kafka.common.record.Record;
import org.apache.kafka.common.record.Records;
import org.apache.kafka.common.record.TimestampType;
import org.apache.kafka.common.utils.ByteBufferInputStream;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.test.TestUtils;
import org.easymock.EasyMock;
import org.junit.Assert;
import org.junit.Test;
import scala.Option;
import scala.collection.JavaConverters;

public class TierFetcherTest {
    private MockTime mockTime = new MockTime();

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void tierFetcherExceptionCausesOnComplete() throws Exception {
        ByteBuffer offsetIndexBuffer = ByteBuffer.allocate(1);
        ByteBuffer segmentFileBuffer = ByteBuffer.allocate(1);
        ByteBuffer timestampFileBuffer = ByteBuffer.allocate(1);
        MockedTierObjectStore tierObjectStore = new MockedTierObjectStore(segmentFileBuffer, offsetIndexBuffer, timestampFileBuffer);
        TopicIdPartition topicIdPartition = new TopicIdPartition("foo", UUID.randomUUID(), 0);
        TierObjectStore.ObjectMetadata tierObjectMetadata = new TierObjectStore.ObjectMetadata(topicIdPartition, UUID.randomUUID(), 0, 0L, false);
        Metrics metrics = new Metrics();
        KafkaScheduler kafkaScheduler = (KafkaScheduler)EasyMock.createNiceMock(KafkaScheduler.class);
        try (TierFetcher tierFetcher = new TierFetcher((TierObjectStore)tierObjectStore, kafkaScheduler, metrics);){
            int maxBytes = 600;
            TierFetchMetadata fetchMetadata = new TierFetchMetadata(topicIdPartition.topicPartition(), 0L, Integer.valueOf(maxBytes), 1000L, true, tierObjectMetadata, Option.empty(), 0L, 1000);
            CompletableFuture f = new CompletableFuture();
            tierObjectStore.failNextRequest();
            Assert.assertEquals((double)0.0, (double)((Double)metrics.metric(tierFetcher.tierFetcherMetrics.bytesFetchedTotalMetricName).metricValue()), (double)0.0);
            Assert.assertEquals((double)0.0, (double)((Double)metrics.metric(tierFetcher.tierFetcherMetrics.fetchExceptionTotalMetricName).metricValue()), (double)0.0);
            PendingFetch pending = tierFetcher.fetch(new ArrayList<TierFetchMetadata>(Collections.singletonList(fetchMetadata)), IsolationLevel.READ_UNCOMMITTED, ignored -> f.complete(true));
            TestCase.assertTrue((boolean)((Boolean)f.get(2000L, TimeUnit.MILLISECONDS)));
            Assert.assertEquals((double)0.0, (double)((Double)metrics.metric(tierFetcher.tierFetcherMetrics.bytesFetchedTotalMetricName).metricValue()), (double)0.0);
            Assert.assertEquals((double)1.0, (double)((Double)metrics.metric(tierFetcher.tierFetcherMetrics.fetchExceptionTotalMetricName).metricValue()), (double)0.0);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void tierFetcherFetchCancelled() throws Exception {
        ByteBuffer offsetIndexBuffer = ByteBuffer.allocate(1);
        ByteBuffer segmentFileBuffer = ByteBuffer.allocate(1);
        ByteBuffer timestampFileBuffer = ByteBuffer.allocate(1);
        MockedTierObjectStore tierObjectStore = new MockedTierObjectStore(segmentFileBuffer, offsetIndexBuffer, timestampFileBuffer);
        TopicIdPartition topicIdPartition = new TopicIdPartition("foo", UUID.randomUUID(), 0);
        TierObjectStore.ObjectMetadata tierObjectMetadata = new TierObjectStore.ObjectMetadata(topicIdPartition, UUID.randomUUID(), 0, 0L, false);
        Metrics metrics = new Metrics();
        KafkaScheduler kafkaScheduler = (KafkaScheduler)EasyMock.createNiceMock(KafkaScheduler.class);
        try (TierFetcher tierFetcher = new TierFetcher((TierObjectStore)tierObjectStore, kafkaScheduler, metrics);){
            int maxBytes = 600;
            TierFetchMetadata fetchMetadata = new TierFetchMetadata(topicIdPartition.topicPartition(), 0L, Integer.valueOf(maxBytes), 1000L, true, tierObjectMetadata, Option.empty(), 0L, 1000);
            CompletableFuture f = new CompletableFuture();
            Assert.assertEquals((double)0.0, (double)((Double)metrics.metric(tierFetcher.tierFetcherMetrics.bytesFetchedTotalMetricName).metricValue()), (double)0.0);
            Assert.assertEquals((double)0.0, (double)((Double)metrics.metric(tierFetcher.tierFetcherMetrics.fetchCancellationTotalMetricName).metricValue()), (double)0.0);
            PendingFetch pending = tierFetcher.fetch(new ArrayList<TierFetchMetadata>(Collections.singletonList(fetchMetadata)), IsolationLevel.READ_UNCOMMITTED, ignored -> f.complete(true));
            pending.cancel();
            TestCase.assertTrue((boolean)((Boolean)f.get(2000L, TimeUnit.MILLISECONDS)));
            Assert.assertEquals((double)0.0, (double)((Double)metrics.metric(tierFetcher.tierFetcherMetrics.bytesFetchedTotalMetricName).metricValue()), (double)0.0);
            Assert.assertEquals((double)0.0, (double)((Double)metrics.metric(tierFetcher.tierFetcherMetrics.fetchExceptionTotalMetricName).metricValue()), (double)0.0);
            Assert.assertEquals((double)1.0, (double)((Double)metrics.metric(tierFetcher.tierFetcherMetrics.fetchCancellationTotalMetricName).metricValue()), (double)0.0);
        }
    }

    private ByteBuffer getMemoryRecordsBuffer() {
        ByteBuffer buffer = ByteBuffer.allocate(2048);
        ByteBuffer buffer2 = ByteBuffer.allocate(2048);
        MemoryRecordsBuilder builder = MemoryRecords.builder((ByteBuffer)buffer, (byte)2, (CompressionType)CompressionType.NONE, (TimestampType)TimestampType.CREATE_TIME, (long)0L);
        MemoryRecordsBuilder builder2 = MemoryRecords.builder((ByteBuffer)buffer2, (byte)2, (CompressionType)CompressionType.NONE, (TimestampType)TimestampType.CREATE_TIME, (long)0L);
        IntStream.range(0, 50).forEach(i -> builder.appendWithOffset((long)i, 1L, "a".getBytes(), "v".getBytes()));
        IntStream.range(50, 101).forEach(i -> builder2.appendWithOffset((long)i, 1L, "a".getBytes(), "v".getBytes()));
        builder.build();
        builder2.build();
        buffer.flip();
        buffer2.flip();
        ByteBuffer combinedBuffer = ByteBuffer.allocate(0x400000).put(buffer).put(buffer2);
        combinedBuffer.flip();
        return combinedBuffer;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void tierFetcherRequestEmptyIndexTest() throws Exception {
        ByteBuffer combinedBuffer = this.getMemoryRecordsBuffer();
        MockedTierObjectStore tierObjectStore = new MockedTierObjectStore(combinedBuffer, ByteBuffer.allocate(0), ByteBuffer.allocate(0));
        TopicIdPartition topicIdPartition = new TopicIdPartition("foo", UUID.randomUUID(), 0);
        TierObjectStore.ObjectMetadata tierObjectMetadata = new TierObjectStore.ObjectMetadata(topicIdPartition, UUID.randomUUID(), 0, 0L, false);
        Metrics metrics = new Metrics();
        KafkaScheduler kafkaScheduler = (KafkaScheduler)EasyMock.createNiceMock(KafkaScheduler.class);
        try (TierFetcher tierFetcher = new TierFetcher((TierObjectStore)tierObjectStore, kafkaScheduler, metrics);){
            TierFetchMetadata fetchMetadata = new TierFetchMetadata(topicIdPartition.topicPartition(), 0L, Integer.valueOf(10000), 1000L, true, tierObjectMetadata, Option.empty(), 0L, 1000);
            CompletableFuture f = new CompletableFuture();
            Assert.assertEquals((Object)metrics.metric(tierFetcher.tierFetcherMetrics.bytesFetchedTotalMetricName).metricValue(), (Object)0.0);
            PendingFetch pending = tierFetcher.fetch(new ArrayList<TierFetchMetadata>(Collections.singletonList(fetchMetadata)), IsolationLevel.READ_UNCOMMITTED, ignored -> f.complete(true));
            MockDelayedFetch delayedFetch = new MockDelayedFetch(pending);
            TestCase.assertTrue((boolean)((Boolean)f.get(2000L, TimeUnit.MILLISECONDS)));
            Map fetchResults = pending.finish();
            Assert.assertNotNull((String)"expected non-null fetch result", (Object)fetchResults);
            TestCase.assertTrue(((Double)metrics.metric(tierFetcher.tierFetcherMetrics.bytesFetchedTotalMetricName).metricValue() > 0.0 ? 1 : 0) != 0);
            TestCase.assertTrue((boolean)delayedFetch.tryComplete());
            TierFetchResult fetchResult = (TierFetchResult)fetchResults.get(topicIdPartition.topicPartition());
            Records records2 = fetchResult.records;
            long lastOffset = 0L;
            for (Record record : records2.records()) {
                Assert.assertEquals((String)"Offset not expected", (long)record.offset(), (long)lastOffset);
                ++lastOffset;
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void tierFetcherLocateTargetOffsetTest() throws Exception {
        MemoryRecords[] recordArr = new MemoryRecords[]{this.buildWithOffset(0L), this.buildWithOffset(50L), this.buildWithOffset(100L), this.buildWithOffset(150L), this.buildWithOffset(200L)};
        int indexInterval = recordArr[0].sizeInBytes() + 1;
        File logSegmentDir = TestUtils.tempDirectory();
        Properties logProps = new Properties();
        logProps.put(LogConfig.IndexIntervalBytesProp(), (Object)indexInterval);
        Set override = Collections.emptySet();
        LogConfig logConfig = LogConfig.apply((Map)logProps, (scala.collection.immutable.Set)((scala.collection.mutable.Set)JavaConverters.asScalaSetConverter(override).asScala()).toSet());
        LogSegment logSegment = LogSegment.open((File)logSegmentDir, (long)0L, (LogConfig)logConfig, (Time)this.mockTime, (boolean)false, (int)4096, (boolean)false, (String)"");
        try {
            for (MemoryRecords records2 : recordArr) {
                logSegment.append(((MutableRecordBatch)records2.batches().iterator().next()).baseOffset(), 1L, 1L, records2);
            }
            logSegment.flush();
            logSegment.offsetIndex().flush();
            logSegment.offsetIndex().trimToValidSize();
            long expectedEndOffset = logSegment.readNextOffset() - 1L;
            File offsetIndexFile = logSegment.offsetIndex().file();
            ByteBuffer offsetIndexBuffer = ByteBuffer.wrap(Files.readAllBytes(offsetIndexFile.toPath()));
            File timestampIndexFile = logSegment.offsetIndex().file();
            ByteBuffer timestampIndexBuffer = ByteBuffer.wrap(Files.readAllBytes(timestampIndexFile.toPath()));
            File segmentFile = logSegment.log().file();
            ByteBuffer segmentFileBuffer = ByteBuffer.wrap(Files.readAllBytes(segmentFile.toPath()));
            MockedTierObjectStore tierObjectStore = new MockedTierObjectStore(segmentFileBuffer, offsetIndexBuffer, timestampIndexBuffer);
            TopicIdPartition topicIdPartition = new TopicIdPartition("foo", UUID.randomUUID(), 0);
            UUID objectId = UUID.randomUUID();
            TierObjectStore.ObjectMetadata tierObjectMetadata = new TierObjectStore.ObjectMetadata(topicIdPartition, objectId, 0, 0L, false);
            Metrics metrics = new Metrics();
            KafkaScheduler kafkaScheduler = (KafkaScheduler)EasyMock.createNiceMock(KafkaScheduler.class);
            try (TierFetcher tierFetcher = new TierFetcher((TierObjectStore)tierObjectStore, kafkaScheduler, metrics);){
                int expectedCacheEntries = 0;
                long fetchOffset = 150L;
                while (fetchOffset < expectedEndOffset) {
                    TierFetchMetadata fetchMetadata = new TierFetchMetadata(topicIdPartition.topicPartition(), fetchOffset, Integer.valueOf(1000), 1000L, true, tierObjectMetadata, Option.empty(), 0L, segmentFileBuffer.limit());
                    CompletableFuture f = new CompletableFuture();
                    PendingFetch pending = tierFetcher.fetch(new ArrayList<TierFetchMetadata>(Collections.singletonList(fetchMetadata)), IsolationLevel.READ_UNCOMMITTED, ignored -> f.complete(true));
                    MockDelayedFetch delayedFetch = new MockDelayedFetch(pending);
                    TestCase.assertTrue((boolean)((Boolean)f.get(4000L, TimeUnit.MILLISECONDS)));
                    Map fetchResults = pending.finish();
                    Assert.assertNotNull((String)"expected non-null fetch result", (Object)fetchResults);
                    TestCase.assertTrue(((Double)metrics.metric(tierFetcher.tierFetcherMetrics.bytesFetchedTotalMetricName).metricValue() > 0.0 ? 1 : 0) != 0);
                    TestCase.assertTrue((boolean)delayedFetch.tryComplete());
                    TierFetchResult fetchResult = (TierFetchResult)fetchResults.get(topicIdPartition.topicPartition());
                    Records records3 = fetchResult.records;
                    for (Record record : records3.records()) {
                        Assert.assertEquals((String)"Offset not expected", (long)fetchOffset, (long)record.offset());
                        ++fetchOffset;
                    }
                    if (fetchOffset < expectedEndOffset) {
                        ++expectedCacheEntries;
                    }
                    long expected = expectedCacheEntries;
                    TestUtils.waitForCondition(() -> expected == tierFetcher.cache.size(), (String)"cache not updated by timeout");
                }
                Assert.assertEquals((long)(fetchOffset - 1L), (long)expectedEndOffset);
                Assert.assertEquals((String)"offset index should have been used exactly once, for the initial fetch", (long)1L, (long)tierObjectStore.offsetIndexReads);
            }
        }
        finally {
            logSegment.deleteIfExists();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void tierFetcherRepeatedFetchesViaOffsetCacheTest() throws Exception {
        File logSegmentDir = TestUtils.tempDirectory();
        Properties logProps = new Properties();
        logProps.put(LogConfig.IndexIntervalBytesProp(), (Object)1);
        Set override = Collections.emptySet();
        LogConfig logConfig = LogConfig.apply((Map)logProps, (scala.collection.immutable.Set)((scala.collection.mutable.Set)JavaConverters.asScalaSetConverter(override).asScala()).toSet());
        LogSegment logSegment = LogSegment.open((File)logSegmentDir, (long)0L, (LogConfig)logConfig, (Time)this.mockTime, (boolean)false, (int)4096, (boolean)false, (String)"");
        try {
            logSegment.append(logSegment.readNextOffset() + 49L, 1L, 1L, this.buildWithOffset(logSegment.readNextOffset()));
            logSegment.flush();
            logSegment.append(logSegment.readNextOffset() + 49L, 1L, 1L, this.buildWithOffset(logSegment.readNextOffset()));
            logSegment.flush();
            logSegment.append(logSegment.readNextOffset() + 49L, 1L, 1L, this.buildWithOffset(logSegment.readNextOffset()));
            logSegment.flush();
            logSegment.append(logSegment.readNextOffset() + 49L, 1L, 1L, this.buildWithOffset(logSegment.readNextOffset()));
            logSegment.flush();
            logSegment.offsetIndex().flush();
            logSegment.offsetIndex().trimToValidSize();
            long expectedEndOffset = logSegment.readNextOffset() - 1L;
            File offsetIndexFile = logSegment.offsetIndex().file();
            ByteBuffer offsetIndexBuffer = ByteBuffer.wrap(Files.readAllBytes(offsetIndexFile.toPath()));
            File timestampIndexFile = logSegment.offsetIndex().file();
            ByteBuffer timestampIndexBuffer = ByteBuffer.wrap(Files.readAllBytes(timestampIndexFile.toPath()));
            File segmentFile = logSegment.log().file();
            ByteBuffer segmentFileBuffer = ByteBuffer.wrap(Files.readAllBytes(segmentFile.toPath()));
            MockedTierObjectStore tierObjectStore = new MockedTierObjectStore(segmentFileBuffer, offsetIndexBuffer, timestampIndexBuffer);
            TopicIdPartition topicIdPartition = new TopicIdPartition("foo", UUID.randomUUID(), 0);
            UUID objectId = UUID.randomUUID();
            TierObjectStore.ObjectMetadata tierObjectMetadata = new TierObjectStore.ObjectMetadata(topicIdPartition, objectId, 0, 0L, false);
            Metrics metrics = new Metrics();
            KafkaScheduler kafkaScheduler = (KafkaScheduler)EasyMock.createNiceMock(KafkaScheduler.class);
            try (TierFetcher tierFetcher = new TierFetcher((TierObjectStore)tierObjectStore, kafkaScheduler, metrics);){
                int expectedCacheEntries = 0;
                long fetchOffset = 0L;
                while (fetchOffset < expectedEndOffset) {
                    TierFetchMetadata fetchMetadata = new TierFetchMetadata(topicIdPartition.topicPartition(), fetchOffset, Integer.valueOf(1000), 1000L, true, tierObjectMetadata, Option.empty(), 0L, segmentFileBuffer.limit());
                    CompletableFuture f = new CompletableFuture();
                    PendingFetch pending = tierFetcher.fetch(new ArrayList<TierFetchMetadata>(Collections.singletonList(fetchMetadata)), IsolationLevel.READ_UNCOMMITTED, ignored -> f.complete(true));
                    MockDelayedFetch delayedFetch = new MockDelayedFetch(pending);
                    TestCase.assertTrue((boolean)((Boolean)f.get(4000L, TimeUnit.MILLISECONDS)));
                    Map fetchResults = pending.finish();
                    Assert.assertNotNull((String)"expected non-null fetch result", (Object)fetchResults);
                    TestCase.assertTrue(((Double)metrics.metric(tierFetcher.tierFetcherMetrics.bytesFetchedTotalMetricName).metricValue() > 0.0 ? 1 : 0) != 0);
                    TestCase.assertTrue((boolean)delayedFetch.tryComplete());
                    TierFetchResult fetchResult = (TierFetchResult)fetchResults.get(topicIdPartition.topicPartition());
                    Records records2 = fetchResult.records;
                    for (Record record : records2.records()) {
                        Assert.assertEquals((String)"Offset not expected", (long)fetchOffset, (long)record.offset());
                        ++fetchOffset;
                    }
                    if (fetchOffset < expectedEndOffset) {
                        ++expectedCacheEntries;
                    }
                    long expected = expectedCacheEntries;
                    TestUtils.waitForCondition(() -> expected == tierFetcher.cache.size(), (String)"cache not updated by timeout");
                }
                Assert.assertEquals((long)(fetchOffset - 1L), (long)expectedEndOffset);
                Assert.assertEquals((double)1.0, (double)tierFetcher.cache.hitRatio(), (double)1.0E-4);
                Assert.assertEquals((String)"offset index should not have been used", (long)0L, (long)tierObjectStore.offsetIndexReads);
            }
        }
        finally {
            logSegment.deleteIfExists();
        }
    }

    private MemoryRecords buildWithOffset(long baseOffset) {
        ByteBuffer buffer = ByteBuffer.allocate(2048);
        MemoryRecordsBuilder builder = MemoryRecords.builder((ByteBuffer)buffer, (byte)2, (CompressionType)CompressionType.NONE, (TimestampType)TimestampType.CREATE_TIME, (long)baseOffset);
        IntStream.range(0, 50).forEach(i -> builder.appendWithOffset(baseOffset + (long)i, baseOffset + (long)i, "a".getBytes(), "v".getBytes()));
        return builder.build();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void tierFetcherIndexTest() throws Exception {
        File logSegmentDir = TestUtils.tempDirectory();
        Properties logProps = new Properties();
        logProps.put(LogConfig.IndexIntervalBytesProp(), (Object)1);
        Set override = Collections.emptySet();
        LogConfig logConfig = LogConfig.apply((Map)logProps, (scala.collection.immutable.Set)((scala.collection.mutable.Set)JavaConverters.asScalaSetConverter(override).asScala()).toSet());
        try (LogSegment logSegment = LogSegment.open((File)logSegmentDir, (long)0L, (LogConfig)logConfig, (Time)this.mockTime, (boolean)false, (int)4096, (boolean)false, (String)"");){
            logSegment.append(logSegment.readNextOffset() + 49L, 1L, 1L, this.buildWithOffset(logSegment.readNextOffset()));
            logSegment.flush();
            logSegment.append(logSegment.readNextOffset() + 49L, 1L, 1L, this.buildWithOffset(logSegment.readNextOffset()));
            logSegment.flush();
            logSegment.append(logSegment.readNextOffset() + 49L, 1L, 1L, this.buildWithOffset(logSegment.readNextOffset()));
            logSegment.flush();
            logSegment.offsetIndex().flush();
            logSegment.offsetIndex().trimToValidSize();
            File offsetIndexFile = logSegment.offsetIndex().file();
            ByteBuffer offsetIndexBuffer = ByteBuffer.wrap(Files.readAllBytes(offsetIndexFile.toPath()));
            File timestampIndexFile = logSegment.offsetIndex().file();
            ByteBuffer timestampIndexBuffer = ByteBuffer.wrap(Files.readAllBytes(timestampIndexFile.toPath()));
            File segmentFile = logSegment.log().file();
            ByteBuffer segmentFileBuffer = ByteBuffer.wrap(Files.readAllBytes(segmentFile.toPath()));
            MockedTierObjectStore tierObjectStore = new MockedTierObjectStore(segmentFileBuffer, offsetIndexBuffer, timestampIndexBuffer);
            TopicIdPartition topicIdPartition = new TopicIdPartition("foo", UUID.randomUUID(), 0);
            TierObjectStore.ObjectMetadata tierObjectMetadata = new TierObjectStore.ObjectMetadata(topicIdPartition, UUID.randomUUID(), 0, 0L, false);
            Metrics metrics = new Metrics();
            KafkaScheduler kafkaScheduler = (KafkaScheduler)EasyMock.createNiceMock(KafkaScheduler.class);
            try (TierFetcher tierFetcher = new TierFetcher((TierObjectStore)tierObjectStore, kafkaScheduler, metrics);){
                TierFetchMetadata fetchMetadata = new TierFetchMetadata(topicIdPartition.topicPartition(), 100L, Integer.valueOf(10000), 1000L, true, tierObjectMetadata, Option.empty(), 0L, 1000);
                CompletableFuture f = new CompletableFuture();
                Assert.assertEquals((Object)metrics.metric(tierFetcher.tierFetcherMetrics.bytesFetchedTotalMetricName).metricValue(), (Object)0.0);
                PendingFetch pending = tierFetcher.fetch(new ArrayList<TierFetchMetadata>(Collections.singletonList(fetchMetadata)), IsolationLevel.READ_UNCOMMITTED, ignored -> f.complete(true));
                MockDelayedFetch delayedFetch = new MockDelayedFetch(pending);
                TestCase.assertTrue((boolean)((Boolean)f.get(2000L, TimeUnit.MILLISECONDS)));
                Map fetchResults = pending.finish();
                Assert.assertNotNull((String)"expected non-null fetch result", (Object)fetchResults);
                TestCase.assertTrue(((Double)metrics.metric(tierFetcher.tierFetcherMetrics.bytesFetchedTotalMetricName).metricValue() > 0.0 ? 1 : 0) != 0);
                TestCase.assertTrue((boolean)delayedFetch.tryComplete());
                TierFetchResult fetchResult = (TierFetchResult)fetchResults.get(topicIdPartition.topicPartition());
                Records records2 = fetchResult.records;
                long lastOffset = 100L;
                for (Record record : records2.records()) {
                    Assert.assertEquals((String)"Offset not expected", (long)lastOffset, (long)record.offset());
                    ++lastOffset;
                }
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void tierTimestampIndexTest() throws Exception {
        File logSegmentDir = TestUtils.tempDirectory();
        Properties logProps = new Properties();
        logProps.put(LogConfig.IndexIntervalBytesProp(), (Object)1);
        Set override = Collections.emptySet();
        LogConfig logConfig = LogConfig.apply((Map)logProps, (scala.collection.immutable.Set)((scala.collection.mutable.Set)JavaConverters.asScalaSetConverter(override).asScala()).toSet());
        try (LogSegment logSegment = LogSegment.open((File)logSegmentDir, (long)0L, (LogConfig)logConfig, (Time)this.mockTime, (boolean)false, (int)4096, (boolean)false, (String)"");){
            MemoryRecords records1 = this.buildWithOffset(logSegment.readNextOffset());
            long largestOffset1 = logSegment.readNextOffset() + 49L;
            logSegment.append(largestOffset1, largestOffset1, largestOffset1, records1);
            logSegment.flush();
            MemoryRecords records2 = this.buildWithOffset(logSegment.readNextOffset());
            long largestOffset2 = logSegment.readNextOffset() + 49L;
            logSegment.append(largestOffset2, largestOffset2, largestOffset2, records2);
            logSegment.flush();
            MemoryRecords records3 = this.buildWithOffset(logSegment.readNextOffset());
            long largestOffset3 = logSegment.readNextOffset() + 49L;
            logSegment.append(largestOffset3, largestOffset3, largestOffset3, records3);
            logSegment.flush();
            long largestOffset4 = logSegment.readNextOffset() + 49L;
            logSegment.append(largestOffset4, largestOffset4, largestOffset4, records3);
            logSegment.offsetIndex().flush();
            logSegment.offsetIndex().trimToValidSize();
            logSegment.timeIndex().flush();
            logSegment.timeIndex().trimToValidSize();
            File offsetIndexFile = logSegment.offsetIndex().file();
            ByteBuffer offsetIndexBuffer = ByteBuffer.wrap(Files.readAllBytes(offsetIndexFile.toPath()));
            File timestampIndexFile = logSegment.timeIndex().file();
            ByteBuffer timestampIndexBuffer = ByteBuffer.wrap(Files.readAllBytes(timestampIndexFile.toPath()));
            File segmentFile = logSegment.log().file();
            ByteBuffer segmentFileBuffer = ByteBuffer.wrap(Files.readAllBytes(segmentFile.toPath()));
            MockedTierObjectStore tierObjectStore = new MockedTierObjectStore(segmentFileBuffer, offsetIndexBuffer, timestampIndexBuffer);
            TopicIdPartition topicIdPartition = new TopicIdPartition("foo", UUID.randomUUID(), 0);
            TierObjectStore.ObjectMetadata tierObjectMetadata = new TierObjectStore.ObjectMetadata(topicIdPartition, UUID.randomUUID(), 0, 0L, false);
            Metrics metrics = new Metrics();
            KafkaScheduler kafkaScheduler = (KafkaScheduler)EasyMock.createNiceMock(KafkaScheduler.class);
            try (TierFetcher tierFetcher = new TierFetcher((TierObjectStore)tierObjectStore, kafkaScheduler, metrics);){
                CompletableFuture f = new CompletableFuture();
                HashMap<TopicPartition, TierTimestampAndOffset> timestamps = new HashMap<TopicPartition, TierTimestampAndOffset>();
                timestamps.put(topicIdPartition.topicPartition(), new TierTimestampAndOffset(101L, tierObjectMetadata, segmentFileBuffer.limit()));
                PendingOffsetForTimestamp pending = tierFetcher.fetchOffsetForTimestamp(timestamps, ignored -> f.complete(true));
                f.get(2000L, TimeUnit.MILLISECONDS);
                Assert.assertEquals((String)"incorrect offset for supplied timestamp returned", Optional.of(new FileRecords.FileTimestampAndOffset(101L, 101L, Optional.empty())), pending.results().get(topicIdPartition.topicPartition()));
                tierObjectStore.failNextRequest();
                f = new CompletableFuture();
                timestamps = new HashMap();
                timestamps.put(topicIdPartition.topicPartition(), new TierTimestampAndOffset(101L, tierObjectMetadata, segmentFileBuffer.limit()));
                pending = tierFetcher.fetchOffsetForTimestamp(timestamps, ignored -> f.complete(true));
                Assert.assertEquals((double)0.0, (double)((Double)metrics.metric(tierFetcher.tierFetcherMetrics.fetchOffsetForTimestampExceptionTotalMetricName).metricValue()), (double)0.0);
                f.get(2000L, TimeUnit.MILLISECONDS);
                Assert.assertNotNull((String)"tier object store through exception, pending result should have been completed exceptionally", (Object)((FileRecords.FileTimestampAndOffset)((Optional)pending.results().get((Object)topicIdPartition.topicPartition())).get()).exception);
                Assert.assertEquals((double)1.0, (double)((Double)metrics.metric(tierFetcher.tierFetcherMetrics.fetchOffsetForTimestampExceptionTotalMetricName).metricValue()), (double)0.0);
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void tierFetcherMaxBytesTest() throws Exception {
        File logSegmentDir = TestUtils.tempDirectory();
        Properties logProps = new Properties();
        logProps.put(LogConfig.IndexIntervalBytesProp(), (Object)1);
        Set override = Collections.emptySet();
        LogConfig logConfig = LogConfig.apply((Map)logProps, (scala.collection.immutable.Set)((scala.collection.mutable.Set)JavaConverters.asScalaSetConverter(override).asScala()).toSet());
        try (LogSegment logSegment = LogSegment.open((File)logSegmentDir, (long)0L, (LogConfig)logConfig, (Time)this.mockTime, (boolean)false, (int)4096, (boolean)false, (String)"");){
            logSegment.append(logSegment.readNextOffset() + 49L, 1L, 1L, this.buildWithOffset(logSegment.readNextOffset()));
            logSegment.flush();
            logSegment.append(logSegment.readNextOffset() + 49L, 1L, 1L, this.buildWithOffset(logSegment.readNextOffset()));
            logSegment.flush();
            logSegment.append(logSegment.readNextOffset() + 49L, 1L, 1L, this.buildWithOffset(logSegment.readNextOffset()));
            logSegment.flush();
            logSegment.offsetIndex().flush();
            logSegment.offsetIndex().trimToValidSize();
            File offsetIndexFile = logSegment.offsetIndex().file();
            ByteBuffer offsetIndexBuffer = ByteBuffer.wrap(Files.readAllBytes(offsetIndexFile.toPath()));
            File timestampIndexFile = logSegment.timeIndex().file();
            ByteBuffer timestampIndexBuffer = ByteBuffer.wrap(Files.readAllBytes(timestampIndexFile.toPath()));
            File segmentFile = logSegment.log().file();
            ByteBuffer segmentFileBuffer = ByteBuffer.wrap(Files.readAllBytes(segmentFile.toPath()));
            MockedTierObjectStore tierObjectStore = new MockedTierObjectStore(segmentFileBuffer, offsetIndexBuffer, timestampIndexBuffer);
            TopicIdPartition topicIdPartition = new TopicIdPartition("foo", UUID.randomUUID(), 0);
            TopicPartition topicPartition = topicIdPartition.topicPartition();
            TierObjectStore.ObjectMetadata tierObjectMetadata = new TierObjectStore.ObjectMetadata(topicIdPartition, UUID.randomUUID(), 0, 0L, false);
            Metrics metrics = new Metrics();
            KafkaScheduler kafkaScheduler = (KafkaScheduler)EasyMock.createNiceMock(KafkaScheduler.class);
            try (TierFetcher tierFetcher = new TierFetcher((TierObjectStore)tierObjectStore, kafkaScheduler, metrics);){
                int maxBytes = 600;
                TierFetchMetadata fetchMetadata = new TierFetchMetadata(topicIdPartition.topicPartition(), 0L, Integer.valueOf(maxBytes), 1000L, true, tierObjectMetadata, Option.empty(), 0L, 1000);
                CompletableFuture f = new CompletableFuture();
                Assert.assertEquals((Object)metrics.metric(tierFetcher.tierFetcherMetrics.bytesFetchedTotalMetricName).metricValue(), (Object)0.0);
                PendingFetch pending = tierFetcher.fetch(new ArrayList<TierFetchMetadata>(Collections.singletonList(fetchMetadata)), IsolationLevel.READ_UNCOMMITTED, ignored -> f.complete(true));
                MockDelayedFetch delayedFetch = new MockDelayedFetch(pending);
                TestCase.assertTrue((boolean)((Boolean)f.get(2000L, TimeUnit.MILLISECONDS)));
                Map fetchResults = pending.finish();
                Assert.assertNotNull((String)"expected non-null fetch result", (Object)fetchResults);
                TestCase.assertTrue(((Double)metrics.metric(tierFetcher.tierFetcherMetrics.bytesFetchedTotalMetricName).metricValue() > 0.0 ? 1 : 0) != 0);
                TestCase.assertTrue((boolean)delayedFetch.tryComplete());
                TierFetchResult fetchResult = (TierFetchResult)fetchResults.get(topicPartition);
                Records records2 = fetchResult.records;
                TestCase.assertTrue((fetchResult.records.sizeInBytes() <= maxBytes ? 1 : 0) != 0);
                long lastOffset = 0L;
                for (Record record : records2.records()) {
                    Assert.assertEquals((String)"Offset not expected", (long)record.offset(), (long)lastOffset);
                    ++lastOffset;
                }
                Assert.assertEquals((String)"When we set maxBytes low, we just read the first 50 records successfully.", (long)50L, (long)lastOffset);
            }
        }
    }

    class MockedTierObjectStore
    implements TierObjectStore {
        private final ByteBuffer segmentByteBuffer;
        private final ByteBuffer offsetByteBuffer;
        private final ByteBuffer timestampByteBuffer;
        private final AtomicBoolean failNextRequest = new AtomicBoolean(false);
        int segmentReads = 0;
        int offsetIndexReads = 0;
        int timestampIndexReads = 0;

        MockedTierObjectStore(ByteBuffer segmentByteBuffer, ByteBuffer indexByteBuffer, ByteBuffer timestampByteBuffer) {
            this.segmentByteBuffer = segmentByteBuffer;
            this.offsetByteBuffer = indexByteBuffer;
            this.timestampByteBuffer = timestampByteBuffer;
        }

        void failNextRequest() {
            this.failNextRequest.set(true);
        }

        public void close() {
        }

        public TierObjectStoreResponse getObject(TierObjectStore.ObjectMetadata tierObjectMetadata, TierObjectStore.FileType fileType, Integer byteOffset, Integer byteOffsetEnd) throws IOException {
            ByteBuffer buffer;
            if (this.failNextRequest.compareAndSet(true, false)) {
                throw new IOException("Failed to retrieve object.");
            }
            if (fileType == TierObjectStore.FileType.OFFSET_INDEX) {
                ++this.offsetIndexReads;
                buffer = this.offsetByteBuffer;
            } else if (fileType == TierObjectStore.FileType.SEGMENT) {
                ++this.segmentReads;
                buffer = this.segmentByteBuffer;
            } else if (fileType == TierObjectStore.FileType.TIMESTAMP_INDEX) {
                ++this.timestampIndexReads;
                buffer = this.timestampByteBuffer;
            } else {
                throw new UnsupportedOperationException();
            }
            int start2 = byteOffset == null ? 0 : byteOffset;
            int end = byteOffsetEnd == null ? buffer.limit() : Math.min(byteOffsetEnd, buffer.limit());
            int byteBufferSize = Math.min(end - start2, buffer.array().length);
            ByteBuffer buf = ByteBuffer.allocate(byteBufferSize);
            buf.put(buffer.array(), start2, byteBufferSize);
            buf.flip();
            return new MockTierObjectStoreResponse((InputStream)new ByteBufferInputStream(buf));
        }

        public void putSegment(TierObjectStore.ObjectMetadata objectMetadata, File segmentData, File offsetIndexData, File timestampIndexData, Optional<File> producerStateSnapshotData, Optional<ByteBuffer> transactionIndexData, Optional<File> epochState) {
            throw new UnsupportedOperationException();
        }

        public void deleteSegment(TierObjectStore.ObjectMetadata objectMetadata) {
        }

        class MockTierObjectStoreResponse
        implements TierObjectStoreResponse {
            private final InputStream is;

            MockTierObjectStoreResponse(InputStream is) {
                this.is = is;
            }

            public InputStream getInputStream() {
                return this.is;
            }

            public void close() {
            }
        }
    }

    class MockDelayedFetch
    extends DelayedOperation {
        PendingFetch fetch;

        MockDelayedFetch(PendingFetch fetch) {
            super(0L, Option.empty());
            this.fetch = fetch;
        }

        public void onExpiration() {
        }

        public void onComplete() {
            this.fetch.finish();
        }

        public boolean tryComplete() {
            if (this.fetch.isComplete()) {
                return this.forceComplete();
            }
            return false;
        }
    }
}

