/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.clients.consumer.internals;

import java.io.DataOutputStream;
import java.io.OutputStream;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.kafka.clients.ApiVersions;
import org.apache.kafka.clients.ClientRequest;
import org.apache.kafka.clients.KafkaClient;
import org.apache.kafka.clients.Metadata;
import org.apache.kafka.clients.MockClient;
import org.apache.kafka.clients.NetworkClient;
import org.apache.kafka.clients.NodeApiVersions;
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.NoOffsetForPartitionException;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.consumer.OffsetAndTimestamp;
import org.apache.kafka.clients.consumer.OffsetOutOfRangeException;
import org.apache.kafka.clients.consumer.OffsetResetStrategy;
import org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient;
import org.apache.kafka.clients.consumer.internals.Fetcher;
import org.apache.kafka.clients.consumer.internals.FetcherMetricsRegistry;
import org.apache.kafka.clients.consumer.internals.NoOpConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.internals.SubscriptionState;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.InvalidTopicException;
import org.apache.kafka.common.errors.RecordTooLargeException;
import org.apache.kafka.common.errors.SerializationException;
import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.errors.TopicAuthorizationException;
import org.apache.kafka.common.header.Header;
import org.apache.kafka.common.header.internals.RecordHeader;
import org.apache.kafka.common.metrics.KafkaMetric;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.network.NetworkReceive;
import org.apache.kafka.common.network.Selectable;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.record.BufferSupplier;
import org.apache.kafka.common.record.CompressionType;
import org.apache.kafka.common.record.ControlRecordType;
import org.apache.kafka.common.record.DefaultRecordBatch;
import org.apache.kafka.common.record.EndTransactionMarker;
import org.apache.kafka.common.record.LegacyRecord;
import org.apache.kafka.common.record.MemoryRecords;
import org.apache.kafka.common.record.MemoryRecordsBuilder;
import org.apache.kafka.common.record.Record;
import org.apache.kafka.common.record.RecordBatch;
import org.apache.kafka.common.record.Records;
import org.apache.kafka.common.record.SimpleRecord;
import org.apache.kafka.common.record.TimestampType;
import org.apache.kafka.common.requests.AbstractRequest;
import org.apache.kafka.common.requests.AbstractResponse;
import org.apache.kafka.common.requests.ApiVersionsResponse;
import org.apache.kafka.common.requests.FetchRequest;
import org.apache.kafka.common.requests.FetchResponse;
import org.apache.kafka.common.requests.IsolationLevel;
import org.apache.kafka.common.requests.ListOffsetRequest;
import org.apache.kafka.common.requests.ListOffsetResponse;
import org.apache.kafka.common.requests.MetadataRequest;
import org.apache.kafka.common.requests.MetadataResponse;
import org.apache.kafka.common.requests.ResponseHeader;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.utils.ByteBufferOutputStream;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.test.DelayedReceive;
import org.apache.kafka.test.MockSelector;
import org.apache.kafka.test.TestUtils;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

public class FetcherTest {
    private ConsumerRebalanceListener listener = new NoOpConsumerRebalanceListener();
    private String topicName = "test";
    private String groupId = "test-group";
    private final String metricGroup = "consumer" + this.groupId + "-fetch-manager-metrics";
    private TopicPartition tp1 = new TopicPartition(this.topicName, 0);
    private TopicPartition tp2 = new TopicPartition(this.topicName, 1);
    private int minBytes = 1;
    private int maxBytes = Integer.MAX_VALUE;
    private int maxWaitMs = 0;
    private int fetchSize = 1000;
    private long retryBackoffMs = 100L;
    private MockTime time = new MockTime(1L);
    private Metadata metadata = new Metadata(0L, Long.MAX_VALUE, true);
    private MockClient client = new MockClient(this.time, this.metadata);
    private Cluster cluster = TestUtils.singletonCluster(this.topicName, 2);
    private Node node = (Node)this.cluster.nodes().get(0);
    private Metrics metrics = new Metrics((Time)this.time);
    FetcherMetricsRegistry metricsRegistry = new FetcherMetricsRegistry("consumer" + this.groupId);
    private SubscriptionState subscriptions = new SubscriptionState(OffsetResetStrategy.EARLIEST);
    private SubscriptionState subscriptionsNoAutoReset = new SubscriptionState(OffsetResetStrategy.NONE);
    private static final double EPSILON = 1.0E-4;
    private ConsumerNetworkClient consumerClient = new ConsumerNetworkClient((KafkaClient)this.client, this.metadata, (Time)this.time, 100L, 1000L);
    private MemoryRecords records;
    private MemoryRecords nextRecords;
    private Fetcher<byte[], byte[]> fetcher = this.createFetcher(this.subscriptions, this.metrics);
    private Metrics fetcherMetrics = new Metrics((Time)this.time);
    private Fetcher<byte[], byte[]> fetcherNoAutoReset = this.createFetcher(this.subscriptionsNoAutoReset, this.fetcherMetrics);

    @Before
    public void setup() throws Exception {
        this.metadata.update(this.cluster, Collections.emptySet(), this.time.milliseconds());
        this.client.setNode(this.node);
        MemoryRecordsBuilder builder = MemoryRecords.builder((ByteBuffer)ByteBuffer.allocate(1024), (CompressionType)CompressionType.NONE, (TimestampType)TimestampType.CREATE_TIME, (long)1L);
        builder.append(0L, "key".getBytes(), "value-1".getBytes());
        builder.append(0L, "key".getBytes(), "value-2".getBytes());
        builder.append(0L, "key".getBytes(), "value-3".getBytes());
        this.records = builder.build();
        builder = MemoryRecords.builder((ByteBuffer)ByteBuffer.allocate(1024), (CompressionType)CompressionType.NONE, (TimestampType)TimestampType.CREATE_TIME, (long)4L);
        builder.append(0L, "key".getBytes(), "value-4".getBytes());
        builder.append(0L, "key".getBytes(), "value-5".getBytes());
        this.nextRecords = builder.build();
    }

    @After
    public void teardown() {
        this.metrics.close();
        this.fetcherMetrics.close();
        this.fetcher.close();
        this.fetcherMetrics.close();
    }

    @Test
    public void testFetchNormal() {
        this.subscriptions.assignFromUser(Collections.singleton(this.tp1));
        this.subscriptions.seek(this.tp1, 0L);
        Assert.assertEquals((long)1L, (long)this.fetcher.sendFetches());
        Assert.assertFalse((boolean)this.fetcher.hasCompletedFetches());
        this.client.prepareResponse((AbstractResponse)this.fetchResponse(this.tp1, this.records, Errors.NONE, 100L, 0));
        this.consumerClient.poll(0L);
        Assert.assertTrue((boolean)this.fetcher.hasCompletedFetches());
        Map partitionRecords = this.fetcher.fetchedRecords();
        Assert.assertTrue((boolean)partitionRecords.containsKey(this.tp1));
        List records = (List)partitionRecords.get(this.tp1);
        Assert.assertEquals((long)3L, (long)records.size());
        Assert.assertEquals((long)4L, (long)this.subscriptions.position(this.tp1));
        long offset = 1L;
        for (ConsumerRecord record : records) {
            Assert.assertEquals((long)offset, (long)record.offset());
            ++offset;
        }
    }

    @Test
    public void testFetcherIgnoresControlRecords() {
        this.subscriptions.assignFromUser(Collections.singleton(this.tp1));
        this.subscriptions.seek(this.tp1, 0L);
        Assert.assertEquals((long)1L, (long)this.fetcher.sendFetches());
        Assert.assertFalse((boolean)this.fetcher.hasCompletedFetches());
        long producerId = 1L;
        short producerEpoch = 0;
        int baseSequence = 0;
        int partitionLeaderEpoch = 0;
        ByteBuffer buffer = ByteBuffer.allocate(1024);
        MemoryRecordsBuilder builder = MemoryRecords.idempotentBuilder((ByteBuffer)buffer, (CompressionType)CompressionType.NONE, (long)0L, (long)producerId, (short)producerEpoch, (int)baseSequence);
        builder.append(0L, "key".getBytes(), null);
        builder.close();
        MemoryRecords.writeEndTransactionalMarker((ByteBuffer)buffer, (long)1L, (long)this.time.milliseconds(), (int)partitionLeaderEpoch, (long)producerId, (short)producerEpoch, (EndTransactionMarker)new EndTransactionMarker(ControlRecordType.ABORT, 0));
        buffer.flip();
        this.client.prepareResponse((AbstractResponse)this.fetchResponse(this.tp1, MemoryRecords.readableRecords((ByteBuffer)buffer), Errors.NONE, 100L, 0));
        this.consumerClient.poll(0L);
        Assert.assertTrue((boolean)this.fetcher.hasCompletedFetches());
        Map partitionRecords = this.fetcher.fetchedRecords();
        Assert.assertTrue((boolean)partitionRecords.containsKey(this.tp1));
        List records = (List)partitionRecords.get(this.tp1);
        Assert.assertEquals((long)1L, (long)records.size());
        Assert.assertEquals((long)2L, (long)this.subscriptions.position(this.tp1));
        ConsumerRecord record = (ConsumerRecord)records.get(0);
        Assert.assertArrayEquals((byte[])"key".getBytes(), (byte[])((byte[])record.key()));
    }

    @Test
    public void testFetchError() {
        this.subscriptions.assignFromUser(Collections.singleton(this.tp1));
        this.subscriptions.seek(this.tp1, 0L);
        Assert.assertEquals((long)1L, (long)this.fetcher.sendFetches());
        Assert.assertFalse((boolean)this.fetcher.hasCompletedFetches());
        this.client.prepareResponse((AbstractResponse)this.fetchResponse(this.tp1, this.records, Errors.NOT_LEADER_FOR_PARTITION, 100L, 0));
        this.consumerClient.poll(0L);
        Assert.assertTrue((boolean)this.fetcher.hasCompletedFetches());
        Map partitionRecords = this.fetcher.fetchedRecords();
        Assert.assertFalse((boolean)partitionRecords.containsKey(this.tp1));
    }

    private MockClient.RequestMatcher matchesOffset(final TopicPartition tp, final long offset) {
        return new MockClient.RequestMatcher(){

            @Override
            public boolean matches(AbstractRequest body) {
                FetchRequest fetch = (FetchRequest)body;
                return fetch.fetchData().containsKey(tp) && ((FetchRequest.PartitionData)fetch.fetchData().get((Object)tp)).fetchOffset == offset;
            }
        };
    }

    @Test
    public void testFetchedRecordsRaisesOnSerializationErrors() {
        ByteArrayDeserializer deserializer = new ByteArrayDeserializer(){
            int i = 0;

            public byte[] deserialize(String topic, byte[] data) {
                if (this.i++ % 2 == 1) {
                    Assert.assertEquals((Object)"value-1", (Object)new String(data, StandardCharsets.UTF_8));
                    throw new SerializationException();
                }
                return data;
            }
        };
        Fetcher fetcher = this.createFetcher(this.subscriptions, new Metrics((Time)this.time), (Deserializer)deserializer, (Deserializer)deserializer);
        this.subscriptions.assignFromUser(Collections.singleton(this.tp1));
        this.subscriptions.seek(this.tp1, 1L);
        this.client.prepareResponse(this.matchesOffset(this.tp1, 1L), (AbstractResponse)this.fetchResponse(this.tp1, this.records, Errors.NONE, 100L, 0));
        Assert.assertEquals((long)1L, (long)fetcher.sendFetches());
        this.consumerClient.poll(0L);
        for (int i = 0; i < 2; ++i) {
            try {
                fetcher.fetchedRecords();
                Assert.fail((String)"fetchedRecords should have raised");
                continue;
            }
            catch (SerializationException e) {
                Assert.assertEquals((long)1L, (long)this.subscriptions.position(this.tp1));
            }
        }
    }

    @Test
    public void testParseCorruptedRecord() throws Exception {
        ByteBuffer buffer = ByteBuffer.allocate(1024);
        DataOutputStream out = new DataOutputStream((OutputStream)new ByteBufferOutputStream(buffer));
        byte magic = 1;
        byte[] key = "foo".getBytes();
        byte[] value = "baz".getBytes();
        long offset = 0L;
        long timestamp = 500L;
        int size = LegacyRecord.recordSize((byte)magic, (int)key.length, (int)value.length);
        byte attributes = LegacyRecord.computeAttributes((byte)magic, (CompressionType)CompressionType.NONE, (TimestampType)TimestampType.CREATE_TIME);
        long crc = LegacyRecord.computeChecksum((byte)magic, (byte)attributes, (long)timestamp, (byte[])key, (byte[])value);
        out.writeLong(offset);
        out.writeInt(size);
        LegacyRecord.write((DataOutputStream)out, (byte)magic, (long)crc, (byte)LegacyRecord.computeAttributes((byte)magic, (CompressionType)CompressionType.NONE, (TimestampType)TimestampType.CREATE_TIME), (long)timestamp, (byte[])key, (byte[])value);
        out.writeLong(offset + 1L);
        out.writeInt(size);
        LegacyRecord.write((DataOutputStream)out, (byte)magic, (long)(crc + 1L), (byte)LegacyRecord.computeAttributes((byte)magic, (CompressionType)CompressionType.NONE, (TimestampType)TimestampType.CREATE_TIME), (long)timestamp, (byte[])key, (byte[])value);
        out.writeLong(offset + 2L);
        out.writeInt(size);
        LegacyRecord.write((DataOutputStream)out, (byte)magic, (long)crc, (byte)LegacyRecord.computeAttributes((byte)magic, (CompressionType)CompressionType.NONE, (TimestampType)TimestampType.CREATE_TIME), (long)timestamp, (byte[])key, (byte[])value);
        out.writeLong(offset + 3L);
        out.writeInt(1);
        out.writeLong(offset + 4L);
        out.writeInt(size);
        LegacyRecord.write((DataOutputStream)out, (byte)magic, (long)crc, (byte)LegacyRecord.computeAttributes((byte)magic, (CompressionType)CompressionType.NONE, (TimestampType)TimestampType.CREATE_TIME), (long)timestamp, (byte[])key, (byte[])value);
        buffer.flip();
        this.subscriptions.assignFromUser(Collections.singleton(this.tp1));
        this.subscriptions.seek(this.tp1, 0L);
        Assert.assertEquals((long)1L, (long)this.fetcher.sendFetches());
        this.client.prepareResponse((AbstractResponse)this.fetchResponse(this.tp1, MemoryRecords.readableRecords((ByteBuffer)buffer), Errors.NONE, 100L, 0));
        this.consumerClient.poll(0L);
        Assert.assertEquals((long)1L, (long)((List)this.fetcher.fetchedRecords().get(this.tp1)).size());
        Assert.assertEquals((long)1L, (long)this.subscriptions.position(this.tp1));
        this.ensureBlockOnRecord(1L);
        this.seekAndConsumeRecord(buffer, 2L);
        this.ensureBlockOnRecord(3L);
        try {
            this.seekAndConsumeRecord(buffer, 4L);
            Assert.fail((String)"Should have thrown exception when fail to retrieve a record from iterator.");
        }
        catch (KafkaException kafkaException) {
            // empty catch block
        }
        this.ensureBlockOnRecord(4L);
    }

    private void ensureBlockOnRecord(long blockedOffset) {
        for (int i = 0; i < 2; ++i) {
            try {
                this.fetcher.fetchedRecords();
                Assert.fail((String)"fetchedRecords should have raised KafkaException");
                continue;
            }
            catch (KafkaException e) {
                Assert.assertEquals((long)blockedOffset, (long)this.subscriptions.position(this.tp1));
            }
        }
    }

    private void seekAndConsumeRecord(ByteBuffer responseBuffer, long toOffset) {
        this.subscriptions.seek(this.tp1, toOffset);
        this.fetcher.fetchedRecords();
        Assert.assertEquals((long)1L, (long)this.fetcher.sendFetches());
        this.client.prepareResponse((AbstractResponse)this.fetchResponse(this.tp1, MemoryRecords.readableRecords((ByteBuffer)responseBuffer), Errors.NONE, 100L, 0));
        this.consumerClient.poll(0L);
        List records = (List)this.fetcher.fetchedRecords().get(this.tp1);
        Assert.assertEquals((long)1L, (long)records.size());
        Assert.assertEquals((long)toOffset, (long)((ConsumerRecord)records.get(0)).offset());
        Assert.assertEquals((long)(toOffset + 1L), (long)this.subscriptions.position(this.tp1));
    }

    @Test
    public void testInvalidDefaultRecordBatch() {
        ByteBuffer buffer = ByteBuffer.allocate(1024);
        ByteBufferOutputStream out = new ByteBufferOutputStream(buffer);
        MemoryRecordsBuilder builder = new MemoryRecordsBuilder(out, 2, CompressionType.NONE, TimestampType.CREATE_TIME, 0L, 10L, 0L, 0, 0, false, false, 0, 1024);
        builder.append(10L, "key".getBytes(), "value".getBytes());
        builder.close();
        buffer.flip();
        buffer.position(17);
        buffer.put("beef".getBytes());
        buffer.position(0);
        this.subscriptions.assignFromUser(Collections.singleton(this.tp1));
        this.subscriptions.seek(this.tp1, 0L);
        Assert.assertEquals((long)1L, (long)this.fetcher.sendFetches());
        this.client.prepareResponse((AbstractResponse)this.fetchResponse(this.tp1, MemoryRecords.readableRecords((ByteBuffer)buffer), Errors.NONE, 100L, 0));
        this.consumerClient.poll(0L);
        for (int i = 0; i < 2; ++i) {
            try {
                this.fetcher.fetchedRecords();
                Assert.fail((String)"fetchedRecords should have raised KafkaException");
                continue;
            }
            catch (KafkaException e) {
                Assert.assertEquals((long)0L, (long)this.subscriptions.position(this.tp1));
            }
        }
    }

    @Test
    public void testParseInvalidRecordBatch() throws Exception {
        MemoryRecords records = MemoryRecords.withRecords((byte)2, (long)0L, (CompressionType)CompressionType.NONE, (TimestampType)TimestampType.CREATE_TIME, (SimpleRecord[])new SimpleRecord[]{new SimpleRecord(1L, "a".getBytes(), "1".getBytes()), new SimpleRecord(2L, "b".getBytes(), "2".getBytes()), new SimpleRecord(3L, "c".getBytes(), "3".getBytes())});
        ByteBuffer buffer = records.buffer();
        buffer.putInt(32, buffer.get(32) ^ 0x5332717);
        this.subscriptions.assignFromUser(Collections.singleton(this.tp1));
        this.subscriptions.seek(this.tp1, 0L);
        Assert.assertEquals((long)1L, (long)this.fetcher.sendFetches());
        this.client.prepareResponse((AbstractResponse)this.fetchResponse(this.tp1, MemoryRecords.readableRecords((ByteBuffer)buffer), Errors.NONE, 100L, 0));
        this.consumerClient.poll(0L);
        try {
            this.fetcher.fetchedRecords();
            Assert.fail((String)"fetchedRecords should have raised");
        }
        catch (KafkaException e) {
            Assert.assertEquals((long)0L, (long)this.subscriptions.position(this.tp1));
        }
    }

    @Test
    public void testHeaders() {
        Fetcher<byte[], byte[]> fetcher = this.createFetcher(this.subscriptions, new Metrics((Time)this.time));
        MemoryRecordsBuilder builder = MemoryRecords.builder((ByteBuffer)ByteBuffer.allocate(1024), (CompressionType)CompressionType.NONE, (TimestampType)TimestampType.CREATE_TIME, (long)1L);
        builder.append(0L, "key".getBytes(), "value-1".getBytes());
        Header[] headersArray = new Header[]{new RecordHeader("headerKey", "headerValue".getBytes(StandardCharsets.UTF_8))};
        builder.append(0L, "key".getBytes(), "value-2".getBytes(), headersArray);
        Header[] headersArray2 = new Header[]{new RecordHeader("headerKey", "headerValue".getBytes(StandardCharsets.UTF_8)), new RecordHeader("headerKey", "headerValue2".getBytes(StandardCharsets.UTF_8))};
        builder.append(0L, "key".getBytes(), "value-3".getBytes(), headersArray2);
        MemoryRecords memoryRecords = builder.build();
        this.subscriptions.assignFromUser(Collections.singleton(this.tp1));
        this.subscriptions.seek(this.tp1, 1L);
        this.client.prepareResponse(this.matchesOffset(this.tp1, 1L), (AbstractResponse)this.fetchResponse(this.tp1, memoryRecords, Errors.NONE, 100L, 0));
        Assert.assertEquals((long)1L, (long)fetcher.sendFetches());
        this.consumerClient.poll(0L);
        List records = (List)fetcher.fetchedRecords().get(this.tp1);
        Assert.assertEquals((long)3L, (long)records.size());
        Iterator recordIterator = records.iterator();
        ConsumerRecord record = (ConsumerRecord)recordIterator.next();
        Assert.assertNull((Object)record.headers().lastHeader("headerKey"));
        record = (ConsumerRecord)recordIterator.next();
        Assert.assertEquals((Object)"headerValue", (Object)new String(record.headers().lastHeader("headerKey").value(), StandardCharsets.UTF_8));
        Assert.assertEquals((Object)"headerKey", (Object)record.headers().lastHeader("headerKey").key());
        record = (ConsumerRecord)recordIterator.next();
        Assert.assertEquals((Object)"headerValue2", (Object)new String(record.headers().lastHeader("headerKey").value(), StandardCharsets.UTF_8));
        Assert.assertEquals((Object)"headerKey", (Object)record.headers().lastHeader("headerKey").key());
    }

    @Test
    public void testFetchMaxPollRecords() {
        Fetcher<byte[], byte[]> fetcher = this.createFetcher(this.subscriptions, new Metrics((Time)this.time), 2);
        this.subscriptions.assignFromUser(Collections.singleton(this.tp1));
        this.subscriptions.seek(this.tp1, 1L);
        this.client.prepareResponse(this.matchesOffset(this.tp1, 1L), (AbstractResponse)this.fetchResponse(this.tp1, this.records, Errors.NONE, 100L, 0));
        this.client.prepareResponse(this.matchesOffset(this.tp1, 4L), (AbstractResponse)this.fetchResponse(this.tp1, this.nextRecords, Errors.NONE, 100L, 0));
        Assert.assertEquals((long)1L, (long)fetcher.sendFetches());
        this.consumerClient.poll(0L);
        List records = (List)fetcher.fetchedRecords().get(this.tp1);
        Assert.assertEquals((long)2L, (long)records.size());
        Assert.assertEquals((long)3L, (long)this.subscriptions.position(this.tp1));
        Assert.assertEquals((long)1L, (long)((ConsumerRecord)records.get(0)).offset());
        Assert.assertEquals((long)2L, (long)((ConsumerRecord)records.get(1)).offset());
        Assert.assertEquals((long)0L, (long)fetcher.sendFetches());
        this.consumerClient.poll(0L);
        records = (List)fetcher.fetchedRecords().get(this.tp1);
        Assert.assertEquals((long)1L, (long)records.size());
        Assert.assertEquals((long)4L, (long)this.subscriptions.position(this.tp1));
        Assert.assertEquals((long)3L, (long)((ConsumerRecord)records.get(0)).offset());
        Assert.assertTrue((fetcher.sendFetches() > 0 ? 1 : 0) != 0);
        this.consumerClient.poll(0L);
        records = (List)fetcher.fetchedRecords().get(this.tp1);
        Assert.assertEquals((long)2L, (long)records.size());
        Assert.assertEquals((long)6L, (long)this.subscriptions.position(this.tp1));
        Assert.assertEquals((long)4L, (long)((ConsumerRecord)records.get(0)).offset());
        Assert.assertEquals((long)5L, (long)((ConsumerRecord)records.get(1)).offset());
    }

    @Test
    public void testFetchAfterPartitionWithFetchedRecordsIsUnassigned() {
        Fetcher<byte[], byte[]> fetcher = this.createFetcher(this.subscriptions, new Metrics((Time)this.time), 2);
        this.subscriptions.assignFromUser(Collections.singleton(this.tp1));
        this.subscriptions.seek(this.tp1, 1L);
        this.client.prepareResponse(this.matchesOffset(this.tp1, 1L), (AbstractResponse)this.fetchResponse(this.tp1, this.records, Errors.NONE, 100L, 0));
        Assert.assertEquals((long)1L, (long)fetcher.sendFetches());
        this.consumerClient.poll(0L);
        List records = (List)fetcher.fetchedRecords().get(this.tp1);
        Assert.assertEquals((long)2L, (long)records.size());
        Assert.assertEquals((long)3L, (long)this.subscriptions.position(this.tp1));
        Assert.assertEquals((long)1L, (long)((ConsumerRecord)records.get(0)).offset());
        Assert.assertEquals((long)2L, (long)((ConsumerRecord)records.get(1)).offset());
        this.subscriptions.assignFromUser(Collections.singleton(this.tp2));
        this.client.prepareResponse(this.matchesOffset(this.tp2, 4L), (AbstractResponse)this.fetchResponse(this.tp2, this.nextRecords, Errors.NONE, 100L, 0));
        this.subscriptions.seek(this.tp2, 4L);
        Assert.assertEquals((long)1L, (long)fetcher.sendFetches());
        this.consumerClient.poll(0L);
        Map fetchedRecords = fetcher.fetchedRecords();
        Assert.assertNull(fetchedRecords.get(this.tp1));
        records = (List)fetchedRecords.get(this.tp2);
        Assert.assertEquals((long)2L, (long)records.size());
        Assert.assertEquals((long)6L, (long)this.subscriptions.position(this.tp2));
        Assert.assertEquals((long)4L, (long)((ConsumerRecord)records.get(0)).offset());
        Assert.assertEquals((long)5L, (long)((ConsumerRecord)records.get(1)).offset());
    }

    @Test
    public void testFetchNonContinuousRecords() {
        MemoryRecordsBuilder builder = MemoryRecords.builder((ByteBuffer)ByteBuffer.allocate(1024), (CompressionType)CompressionType.NONE, (TimestampType)TimestampType.CREATE_TIME, (long)0L);
        builder.appendWithOffset(15L, 0L, "key".getBytes(), "value-1".getBytes());
        builder.appendWithOffset(20L, 0L, "key".getBytes(), "value-2".getBytes());
        builder.appendWithOffset(30L, 0L, "key".getBytes(), "value-3".getBytes());
        MemoryRecords records = builder.build();
        this.subscriptions.assignFromUser(Collections.singleton(this.tp1));
        this.subscriptions.seek(this.tp1, 0L);
        Assert.assertEquals((long)1L, (long)this.fetcher.sendFetches());
        this.client.prepareResponse((AbstractResponse)this.fetchResponse(this.tp1, records, Errors.NONE, 100L, 0));
        this.consumerClient.poll(0L);
        List consumerRecords = (List)this.fetcher.fetchedRecords().get(this.tp1);
        Assert.assertEquals((long)3L, (long)consumerRecords.size());
        Assert.assertEquals((long)31L, (long)this.subscriptions.position(this.tp1));
        Assert.assertEquals((long)15L, (long)((ConsumerRecord)consumerRecords.get(0)).offset());
        Assert.assertEquals((long)20L, (long)((ConsumerRecord)consumerRecords.get(1)).offset());
        Assert.assertEquals((long)30L, (long)((ConsumerRecord)consumerRecords.get(2)).offset());
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testFetchRequestWhenRecordTooLarge() {
        try {
            this.client.setNodeApiVersions(NodeApiVersions.create(Collections.singletonList(new ApiVersionsResponse.ApiVersion(ApiKeys.FETCH.id, 2, 2))));
            this.makeFetchRequestWithIncompleteRecord();
            try {
                this.fetcher.fetchedRecords();
                Assert.fail((String)"RecordTooLargeException should have been raised");
            }
            catch (RecordTooLargeException e) {
                Assert.assertTrue((boolean)e.getMessage().startsWith("There are some messages at [Partition=Offset]: "));
                Assert.assertEquals((long)0L, (long)this.subscriptions.position(this.tp1));
            }
        }
        finally {
            this.client.setNodeApiVersions(NodeApiVersions.create());
        }
    }

    @Test
    public void testFetchRequestInternalError() {
        this.makeFetchRequestWithIncompleteRecord();
        try {
            this.fetcher.fetchedRecords();
            Assert.fail((String)"RecordTooLargeException should have been raised");
        }
        catch (KafkaException e) {
            Assert.assertTrue((boolean)e.getMessage().startsWith("Failed to make progress reading messages"));
            Assert.assertEquals((long)0L, (long)this.subscriptions.position(this.tp1));
        }
    }

    private void makeFetchRequestWithIncompleteRecord() {
        this.subscriptions.assignFromUser(Collections.singleton(this.tp1));
        this.subscriptions.seek(this.tp1, 0L);
        Assert.assertEquals((long)1L, (long)this.fetcher.sendFetches());
        Assert.assertFalse((boolean)this.fetcher.hasCompletedFetches());
        MemoryRecords partialRecord = MemoryRecords.readableRecords((ByteBuffer)ByteBuffer.wrap(new byte[]{0, 0, 0, 0, 0, 0, 0, 0}));
        this.client.prepareResponse((AbstractResponse)this.fetchResponse(this.tp1, partialRecord, Errors.NONE, 100L, 0));
        this.consumerClient.poll(0L);
        Assert.assertTrue((boolean)this.fetcher.hasCompletedFetches());
    }

    @Test
    public void testUnauthorizedTopic() {
        this.subscriptions.assignFromUser(Collections.singleton(this.tp1));
        this.subscriptions.seek(this.tp1, 0L);
        Assert.assertEquals((long)1L, (long)this.fetcher.sendFetches());
        this.client.prepareResponse((AbstractResponse)this.fetchResponse(this.tp1, this.records, Errors.TOPIC_AUTHORIZATION_FAILED, 100L, 0));
        this.consumerClient.poll(0L);
        try {
            this.fetcher.fetchedRecords();
            Assert.fail((String)"fetchedRecords should have thrown");
        }
        catch (TopicAuthorizationException e) {
            Assert.assertEquals(Collections.singleton(this.topicName), (Object)e.unauthorizedTopics());
        }
    }

    @Test
    public void testFetchDuringRebalance() {
        this.subscriptions.subscribe(Collections.singleton(this.topicName), this.listener);
        this.subscriptions.assignFromSubscribed(Collections.singleton(this.tp1));
        this.subscriptions.seek(this.tp1, 0L);
        Assert.assertEquals((long)1L, (long)this.fetcher.sendFetches());
        this.subscriptions.assignFromSubscribed(Collections.singleton(this.tp1));
        this.client.prepareResponse((AbstractResponse)this.fetchResponse(this.tp1, this.records, Errors.NONE, 100L, 0));
        this.consumerClient.poll(0L);
        Assert.assertTrue((boolean)this.fetcher.fetchedRecords().isEmpty());
    }

    @Test
    public void testInFlightFetchOnPausedPartition() {
        this.subscriptions.assignFromUser(Collections.singleton(this.tp1));
        this.subscriptions.seek(this.tp1, 0L);
        Assert.assertEquals((long)1L, (long)this.fetcher.sendFetches());
        this.subscriptions.pause(this.tp1);
        this.client.prepareResponse((AbstractResponse)this.fetchResponse(this.tp1, this.records, Errors.NONE, 100L, 0));
        this.consumerClient.poll(0L);
        Assert.assertNull(this.fetcher.fetchedRecords().get(this.tp1));
    }

    @Test
    public void testFetchOnPausedPartition() {
        this.subscriptions.assignFromUser(Collections.singleton(this.tp1));
        this.subscriptions.seek(this.tp1, 0L);
        this.subscriptions.pause(this.tp1);
        Assert.assertFalse((this.fetcher.sendFetches() > 0 ? 1 : 0) != 0);
        Assert.assertTrue((boolean)this.client.requests().isEmpty());
    }

    @Test
    public void testFetchNotLeaderForPartition() {
        this.subscriptions.assignFromUser(Collections.singleton(this.tp1));
        this.subscriptions.seek(this.tp1, 0L);
        Assert.assertEquals((long)1L, (long)this.fetcher.sendFetches());
        this.client.prepareResponse((AbstractResponse)this.fetchResponse(this.tp1, this.records, Errors.NOT_LEADER_FOR_PARTITION, 100L, 0));
        this.consumerClient.poll(0L);
        Assert.assertEquals((long)0L, (long)this.fetcher.fetchedRecords().size());
        Assert.assertEquals((long)0L, (long)this.metadata.timeToNextUpdate(this.time.milliseconds()));
    }

    @Test
    public void testFetchUnknownTopicOrPartition() {
        this.subscriptions.assignFromUser(Collections.singleton(this.tp1));
        this.subscriptions.seek(this.tp1, 0L);
        Assert.assertEquals((long)1L, (long)this.fetcher.sendFetches());
        this.client.prepareResponse((AbstractResponse)this.fetchResponse(this.tp1, this.records, Errors.UNKNOWN_TOPIC_OR_PARTITION, 100L, 0));
        this.consumerClient.poll(0L);
        Assert.assertEquals((long)0L, (long)this.fetcher.fetchedRecords().size());
        Assert.assertEquals((long)0L, (long)this.metadata.timeToNextUpdate(this.time.milliseconds()));
    }

    @Test
    public void testFetchOffsetOutOfRange() {
        this.subscriptions.assignFromUser(Collections.singleton(this.tp1));
        this.subscriptions.seek(this.tp1, 0L);
        Assert.assertEquals((long)1L, (long)this.fetcher.sendFetches());
        this.client.prepareResponse((AbstractResponse)this.fetchResponse(this.tp1, this.records, Errors.OFFSET_OUT_OF_RANGE, 100L, 0));
        this.consumerClient.poll(0L);
        Assert.assertEquals((long)0L, (long)this.fetcher.fetchedRecords().size());
        Assert.assertTrue((boolean)this.subscriptions.isOffsetResetNeeded(this.tp1));
        Assert.assertEquals(null, (Object)this.subscriptions.position(this.tp1));
    }

    @Test
    public void testStaleOutOfRangeError() {
        this.subscriptions.assignFromUser(Collections.singleton(this.tp1));
        this.subscriptions.seek(this.tp1, 0L);
        Assert.assertEquals((long)1L, (long)this.fetcher.sendFetches());
        this.client.prepareResponse((AbstractResponse)this.fetchResponse(this.tp1, this.records, Errors.OFFSET_OUT_OF_RANGE, 100L, 0));
        this.subscriptions.seek(this.tp1, 1L);
        this.consumerClient.poll(0L);
        Assert.assertEquals((long)0L, (long)this.fetcher.fetchedRecords().size());
        Assert.assertFalse((boolean)this.subscriptions.isOffsetResetNeeded(this.tp1));
        Assert.assertEquals((long)1L, (long)this.subscriptions.position(this.tp1));
    }

    @Test
    public void testFetchedRecordsAfterSeek() {
        this.subscriptionsNoAutoReset.assignFromUser(Collections.singleton(this.tp1));
        this.subscriptionsNoAutoReset.seek(this.tp1, 0L);
        Assert.assertTrue((this.fetcherNoAutoReset.sendFetches() > 0 ? 1 : 0) != 0);
        this.client.prepareResponse((AbstractResponse)this.fetchResponse(this.tp1, this.records, Errors.OFFSET_OUT_OF_RANGE, 100L, 0));
        this.consumerClient.poll(0L);
        Assert.assertFalse((boolean)this.subscriptionsNoAutoReset.isOffsetResetNeeded(this.tp1));
        this.subscriptionsNoAutoReset.seek(this.tp1, 2L);
        Assert.assertEquals((long)0L, (long)this.fetcherNoAutoReset.fetchedRecords().size());
    }

    @Test
    public void testFetchOffsetOutOfRangeException() {
        this.subscriptionsNoAutoReset.assignFromUser(Collections.singleton(this.tp1));
        this.subscriptionsNoAutoReset.seek(this.tp1, 0L);
        this.fetcherNoAutoReset.sendFetches();
        this.client.prepareResponse((AbstractResponse)this.fetchResponse(this.tp1, this.records, Errors.OFFSET_OUT_OF_RANGE, 100L, 0));
        this.consumerClient.poll(0L);
        Assert.assertFalse((boolean)this.subscriptionsNoAutoReset.isOffsetResetNeeded(this.tp1));
        for (int i = 0; i < 2; ++i) {
            try {
                this.fetcherNoAutoReset.fetchedRecords();
                Assert.fail((String)"Should have thrown OffsetOutOfRangeException");
                continue;
            }
            catch (OffsetOutOfRangeException e) {
                Assert.assertTrue((boolean)e.offsetOutOfRangePartitions().containsKey(this.tp1));
                Assert.assertEquals((long)e.offsetOutOfRangePartitions().size(), (long)1L);
            }
        }
    }

    @Test
    public void testFetchPositionAfterException() {
        this.subscriptionsNoAutoReset.assignFromUser(Utils.mkSet((Object[])new TopicPartition[]{this.tp1, this.tp2}));
        this.subscriptionsNoAutoReset.seek(this.tp1, 1L);
        this.subscriptionsNoAutoReset.seek(this.tp2, 1L);
        Assert.assertEquals((long)1L, (long)this.fetcherNoAutoReset.sendFetches());
        LinkedHashMap<TopicPartition, FetchResponse.PartitionData> partitions = new LinkedHashMap<TopicPartition, FetchResponse.PartitionData>();
        partitions.put(this.tp2, new FetchResponse.PartitionData(Errors.NONE, 100L, -1L, -1L, null, (Records)this.records));
        partitions.put(this.tp1, new FetchResponse.PartitionData(Errors.OFFSET_OUT_OF_RANGE, 100L, -1L, -1L, null, (Records)MemoryRecords.EMPTY));
        this.client.prepareResponse((AbstractResponse)new FetchResponse(new LinkedHashMap(partitions), 0));
        this.consumerClient.poll(0L);
        ArrayList fetchedRecords = new ArrayList();
        ArrayList<OffsetOutOfRangeException> exceptions = new ArrayList<OffsetOutOfRangeException>();
        for (List records : this.fetcherNoAutoReset.fetchedRecords().values()) {
            fetchedRecords.addAll(records);
        }
        Assert.assertEquals((long)fetchedRecords.size(), (long)(this.subscriptionsNoAutoReset.position(this.tp2) - 1L));
        try {
            for (List records : this.fetcherNoAutoReset.fetchedRecords().values()) {
                fetchedRecords.addAll(records);
            }
        }
        catch (OffsetOutOfRangeException e) {
            exceptions.add(e);
        }
        Assert.assertEquals((long)4L, (long)this.subscriptionsNoAutoReset.position(this.tp2));
        Assert.assertEquals((long)3L, (long)fetchedRecords.size());
        Assert.assertEquals((long)1L, (long)exceptions.size());
        OffsetOutOfRangeException e = (OffsetOutOfRangeException)((Object)exceptions.get(0));
        Assert.assertTrue((boolean)e.offsetOutOfRangePartitions().containsKey(this.tp1));
        Assert.assertEquals((long)e.offsetOutOfRangePartitions().size(), (long)1L);
    }

    @Test
    public void testSeekBeforeException() {
        Fetcher<byte[], byte[]> fetcher = this.createFetcher(this.subscriptionsNoAutoReset, new Metrics((Time)this.time), 2);
        this.subscriptionsNoAutoReset.assignFromUser(Utils.mkSet((Object[])new TopicPartition[]{this.tp1}));
        this.subscriptionsNoAutoReset.seek(this.tp1, 1L);
        Assert.assertEquals((long)1L, (long)fetcher.sendFetches());
        HashMap<TopicPartition, FetchResponse.PartitionData> partitions = new HashMap<TopicPartition, FetchResponse.PartitionData>();
        partitions.put(this.tp1, new FetchResponse.PartitionData(Errors.NONE, 100L, -1L, -1L, null, (Records)this.records));
        this.client.prepareResponse((AbstractResponse)this.fetchResponse(this.tp1, this.records, Errors.NONE, 100L, 0));
        this.consumerClient.poll(0L);
        Assert.assertEquals((long)2L, (long)((List)fetcher.fetchedRecords().get(this.tp1)).size());
        this.subscriptionsNoAutoReset.assignFromUser(Utils.mkSet((Object[])new TopicPartition[]{this.tp1, this.tp2}));
        this.subscriptionsNoAutoReset.seek(this.tp2, 1L);
        Assert.assertEquals((long)1L, (long)fetcher.sendFetches());
        partitions = new HashMap();
        partitions.put(this.tp2, new FetchResponse.PartitionData(Errors.OFFSET_OUT_OF_RANGE, 100L, -1L, -1L, null, (Records)MemoryRecords.EMPTY));
        this.client.prepareResponse((AbstractResponse)new FetchResponse(new LinkedHashMap(partitions), 0));
        this.consumerClient.poll(0L);
        Assert.assertEquals((long)1L, (long)((List)fetcher.fetchedRecords().get(this.tp1)).size());
        this.subscriptionsNoAutoReset.seek(this.tp2, 10L);
        Assert.assertEquals((long)0L, (long)fetcher.fetchedRecords().size());
    }

    @Test
    public void testFetchDisconnected() {
        this.subscriptions.assignFromUser(Collections.singleton(this.tp1));
        this.subscriptions.seek(this.tp1, 0L);
        Assert.assertEquals((long)1L, (long)this.fetcher.sendFetches());
        this.client.prepareResponse((AbstractResponse)this.fetchResponse(this.tp1, this.records, Errors.NONE, 100L, 0), true);
        this.consumerClient.poll(0L);
        Assert.assertEquals((long)0L, (long)this.fetcher.fetchedRecords().size());
        Assert.assertFalse((boolean)this.subscriptions.isOffsetResetNeeded(this.tp1));
        Assert.assertTrue((boolean)this.subscriptions.isFetchable(this.tp1));
        Assert.assertEquals((long)0L, (long)this.subscriptions.position(this.tp1));
    }

    @Test
    public void testUpdateFetchPositionsNoneCommittedNoResetStrategy() {
        HashSet<TopicPartition> tps = new HashSet<TopicPartition>(Arrays.asList(this.tp1, this.tp2));
        this.subscriptionsNoAutoReset.assignFromUser(tps);
        try {
            this.fetcherNoAutoReset.updateFetchPositions(tps);
            Assert.fail((String)"Should have thrown NoOffsetForPartitionException");
        }
        catch (NoOffsetForPartitionException e) {
            Set partitions = e.partitions();
            Assert.assertEquals(tps, (Object)partitions);
        }
    }

    @Test
    public void testUpdateFetchPositionToCommitted() {
        this.subscriptions.assignFromUser(Collections.singleton(this.tp1));
        this.subscriptions.committed(this.tp1, new OffsetAndMetadata(5L));
        this.fetcher.updateFetchPositions(Collections.singleton(this.tp1));
        Assert.assertTrue((boolean)this.subscriptions.isFetchable(this.tp1));
        Assert.assertEquals((long)5L, (long)this.subscriptions.position(this.tp1));
    }

    @Test
    public void testUpdateFetchPositionResetToDefaultOffset() {
        this.subscriptions.assignFromUser(Collections.singleton(this.tp1));
        this.client.prepareResponse(this.listOffsetRequestMatcher(-2L), (AbstractResponse)this.listOffsetResponse(Errors.NONE, 1L, 5L));
        this.fetcher.updateFetchPositions(Collections.singleton(this.tp1));
        Assert.assertFalse((boolean)this.subscriptions.isOffsetResetNeeded(this.tp1));
        Assert.assertTrue((boolean)this.subscriptions.isFetchable(this.tp1));
        Assert.assertEquals((long)5L, (long)this.subscriptions.position(this.tp1));
    }

    @Test
    public void testUpdateFetchPositionResetToLatestOffset() {
        this.subscriptions.assignFromUser(Collections.singleton(this.tp1));
        this.subscriptions.needOffsetReset(this.tp1, OffsetResetStrategy.LATEST);
        this.client.prepareResponse(this.listOffsetRequestMatcher(-1L), (AbstractResponse)this.listOffsetResponse(Errors.NONE, 1L, 5L));
        this.fetcher.updateFetchPositions(Collections.singleton(this.tp1));
        Assert.assertFalse((boolean)this.subscriptions.isOffsetResetNeeded(this.tp1));
        Assert.assertTrue((boolean)this.subscriptions.isFetchable(this.tp1));
        Assert.assertEquals((long)5L, (long)this.subscriptions.position(this.tp1));
    }

    @Test
    public void testListOffsetsSendsIsolationLevel() {
        for (final IsolationLevel isolationLevel : IsolationLevel.values()) {
            Fetcher fetcher = this.createFetcher(this.subscriptions, new Metrics(), (Deserializer)new ByteArrayDeserializer(), (Deserializer)new ByteArrayDeserializer(), Integer.MAX_VALUE, isolationLevel);
            this.subscriptions.assignFromUser(Collections.singleton(this.tp1));
            this.subscriptions.needOffsetReset(this.tp1, OffsetResetStrategy.LATEST);
            this.client.prepareResponse(new MockClient.RequestMatcher(){

                @Override
                public boolean matches(AbstractRequest body) {
                    ListOffsetRequest request = (ListOffsetRequest)body;
                    return request.isolationLevel() == isolationLevel;
                }
            }, (AbstractResponse)this.listOffsetResponse(Errors.NONE, 1L, 5L));
            fetcher.updateFetchPositions(Collections.singleton(this.tp1));
            Assert.assertFalse((boolean)this.subscriptions.isOffsetResetNeeded(this.tp1));
            Assert.assertTrue((boolean)this.subscriptions.isFetchable(this.tp1));
            Assert.assertEquals((long)5L, (long)this.subscriptions.position(this.tp1));
        }
    }

    @Test
    public void testUpdateFetchPositionResetToEarliestOffset() {
        this.subscriptions.assignFromUser(Collections.singleton(this.tp1));
        this.subscriptions.needOffsetReset(this.tp1, OffsetResetStrategy.EARLIEST);
        this.client.prepareResponse(this.listOffsetRequestMatcher(-2L), (AbstractResponse)this.listOffsetResponse(Errors.NONE, 1L, 5L));
        this.fetcher.updateFetchPositions(Collections.singleton(this.tp1));
        Assert.assertFalse((boolean)this.subscriptions.isOffsetResetNeeded(this.tp1));
        Assert.assertTrue((boolean)this.subscriptions.isFetchable(this.tp1));
        Assert.assertEquals((long)5L, (long)this.subscriptions.position(this.tp1));
    }

    @Test
    public void testUpdateFetchPositionDisconnect() {
        this.subscriptions.assignFromUser(Collections.singleton(this.tp1));
        this.subscriptions.needOffsetReset(this.tp1, OffsetResetStrategy.LATEST);
        this.client.prepareResponse(this.listOffsetRequestMatcher(-1L), (AbstractResponse)this.listOffsetResponse(Errors.NONE, 1L, 5L), true);
        this.client.prepareResponse(this.listOffsetRequestMatcher(-1L), (AbstractResponse)this.listOffsetResponse(Errors.NONE, 1L, 5L));
        this.fetcher.updateFetchPositions(Collections.singleton(this.tp1));
        Assert.assertFalse((boolean)this.subscriptions.isOffsetResetNeeded(this.tp1));
        Assert.assertTrue((boolean)this.subscriptions.isFetchable(this.tp1));
        Assert.assertEquals((long)5L, (long)this.subscriptions.position(this.tp1));
    }

    @Test
    public void testUpdateFetchPositionOfPausedPartitionsRequiringOffsetReset() {
        this.subscriptions.assignFromUser(Collections.singleton(this.tp1));
        this.subscriptions.committed(this.tp1, new OffsetAndMetadata(0L));
        this.subscriptions.pause(this.tp1);
        this.subscriptions.needOffsetReset(this.tp1, OffsetResetStrategy.LATEST);
        this.client.prepareResponse(this.listOffsetRequestMatcher(-1L), (AbstractResponse)this.listOffsetResponse(Errors.NONE, 1L, 10L));
        this.fetcher.updateFetchPositions(Collections.singleton(this.tp1));
        Assert.assertFalse((boolean)this.subscriptions.isOffsetResetNeeded(this.tp1));
        Assert.assertFalse((boolean)this.subscriptions.isFetchable(this.tp1));
        Assert.assertTrue((boolean)this.subscriptions.hasValidPosition(this.tp1));
        Assert.assertEquals((long)10L, (long)this.subscriptions.position(this.tp1));
    }

    @Test
    public void testUpdateFetchPositionOfPausedPartitionsWithoutACommittedOffset() {
        this.subscriptions.assignFromUser(Collections.singleton(this.tp1));
        this.subscriptions.pause(this.tp1);
        this.client.prepareResponse(this.listOffsetRequestMatcher(-2L), (AbstractResponse)this.listOffsetResponse(Errors.NONE, 1L, 0L));
        this.fetcher.updateFetchPositions(Collections.singleton(this.tp1));
        Assert.assertFalse((boolean)this.subscriptions.isOffsetResetNeeded(this.tp1));
        Assert.assertFalse((boolean)this.subscriptions.isFetchable(this.tp1));
        Assert.assertTrue((boolean)this.subscriptions.hasValidPosition(this.tp1));
        Assert.assertEquals((long)0L, (long)this.subscriptions.position(this.tp1));
    }

    @Test
    public void testUpdateFetchPositionOfPausedPartitionsWithoutAValidPosition() {
        this.subscriptions.assignFromUser(Collections.singleton(this.tp1));
        this.subscriptions.committed(this.tp1, new OffsetAndMetadata(0L));
        this.subscriptions.pause(this.tp1);
        this.subscriptions.seek(this.tp1, 10L);
        this.fetcher.updateFetchPositions(Collections.singleton(this.tp1));
        Assert.assertFalse((boolean)this.subscriptions.isOffsetResetNeeded(this.tp1));
        Assert.assertFalse((boolean)this.subscriptions.isFetchable(this.tp1));
        Assert.assertTrue((boolean)this.subscriptions.hasValidPosition(this.tp1));
        Assert.assertEquals((long)10L, (long)this.subscriptions.position(this.tp1));
    }

    @Test
    public void testUpdateFetchPositionOfPausedPartitionsWithAValidPosition() {
        this.subscriptions.assignFromUser(Collections.singleton(this.tp1));
        this.subscriptions.committed(this.tp1, new OffsetAndMetadata(0L));
        this.subscriptions.seek(this.tp1, 10L);
        this.subscriptions.pause(this.tp1);
        this.fetcher.updateFetchPositions(Collections.singleton(this.tp1));
        Assert.assertFalse((boolean)this.subscriptions.isOffsetResetNeeded(this.tp1));
        Assert.assertFalse((boolean)this.subscriptions.isFetchable(this.tp1));
        Assert.assertTrue((boolean)this.subscriptions.hasValidPosition(this.tp1));
        Assert.assertEquals((long)10L, (long)this.subscriptions.position(this.tp1));
    }

    @Test
    public void testGetAllTopics() {
        this.client.prepareResponse((AbstractResponse)this.newMetadataResponse(this.topicName, Errors.NONE));
        Map allTopics = this.fetcher.getAllTopicMetadata(5000L);
        Assert.assertEquals((long)this.cluster.topics().size(), (long)allTopics.size());
    }

    @Test
    public void testGetAllTopicsDisconnect() {
        this.client.prepareResponse(null, true);
        this.client.prepareResponse((AbstractResponse)this.newMetadataResponse(this.topicName, Errors.NONE));
        Map allTopics = this.fetcher.getAllTopicMetadata(5000L);
        Assert.assertEquals((long)this.cluster.topics().size(), (long)allTopics.size());
    }

    @Test(expected=TimeoutException.class)
    public void testGetAllTopicsTimeout() {
        this.fetcher.getAllTopicMetadata(50L);
    }

    @Test
    public void testGetAllTopicsUnauthorized() {
        this.client.prepareResponse((AbstractResponse)this.newMetadataResponse(this.topicName, Errors.TOPIC_AUTHORIZATION_FAILED));
        try {
            this.fetcher.getAllTopicMetadata(10L);
            Assert.fail();
        }
        catch (TopicAuthorizationException e) {
            Assert.assertEquals(Collections.singleton(this.topicName), (Object)e.unauthorizedTopics());
        }
    }

    @Test(expected=InvalidTopicException.class)
    public void testGetTopicMetadataInvalidTopic() {
        this.client.prepareResponse((AbstractResponse)this.newMetadataResponse(this.topicName, Errors.INVALID_TOPIC_EXCEPTION));
        this.fetcher.getTopicMetadata(new MetadataRequest.Builder(Collections.singletonList(this.topicName), true), 5000L);
    }

    @Test
    public void testGetTopicMetadataUnknownTopic() {
        this.client.prepareResponse((AbstractResponse)this.newMetadataResponse(this.topicName, Errors.UNKNOWN_TOPIC_OR_PARTITION));
        Map topicMetadata = this.fetcher.getTopicMetadata(new MetadataRequest.Builder(Collections.singletonList(this.topicName), true), 5000L);
        Assert.assertNull(topicMetadata.get(this.topicName));
    }

    @Test
    public void testGetTopicMetadataLeaderNotAvailable() {
        this.client.prepareResponse((AbstractResponse)this.newMetadataResponse(this.topicName, Errors.LEADER_NOT_AVAILABLE));
        this.client.prepareResponse((AbstractResponse)this.newMetadataResponse(this.topicName, Errors.NONE));
        Map topicMetadata = this.fetcher.getTopicMetadata(new MetadataRequest.Builder(Collections.singletonList(this.topicName), true), 5000L);
        Assert.assertTrue((boolean)topicMetadata.containsKey(this.topicName));
    }

    @Test
    public void testQuotaMetrics() throws Exception {
        MockSelector selector = new MockSelector(this.time);
        Sensor throttleTimeSensor = Fetcher.throttleTimeSensor((Metrics)this.metrics, (FetcherMetricsRegistry)this.metricsRegistry);
        Cluster cluster = TestUtils.singletonCluster("test", 1);
        Node node = (Node)cluster.nodes().get(0);
        NetworkClient client = new NetworkClient((Selectable)selector, this.metadata, "mock", Integer.MAX_VALUE, 1000L, 1000L, 65536, 65536, 1000, (Time)this.time, true, new ApiVersions(), throttleTimeSensor);
        short apiVersionsResponseVersion = ApiKeys.API_VERSIONS.latestVersion();
        ByteBuffer buffer = ApiVersionsResponse.createApiVersionsResponse((int)400, (byte)2).serialize(apiVersionsResponseVersion, new ResponseHeader(0));
        selector.delayedReceive(new DelayedReceive(node.idString(), new NetworkReceive(node.idString(), buffer)));
        while (!client.ready(node, this.time.milliseconds())) {
            client.poll(1L, this.time.milliseconds());
        }
        selector.clear();
        for (int i = 1; i <= 3; ++i) {
            int throttleTimeMs = 100 * i;
            FetchRequest.Builder builder = FetchRequest.Builder.forConsumer((int)100, (int)100, new LinkedHashMap());
            ClientRequest request = client.newClientRequest(node.idString(), (AbstractRequest.Builder)builder, this.time.milliseconds(), true, null);
            client.send(request, this.time.milliseconds());
            client.poll(1L, this.time.milliseconds());
            FetchResponse response = this.fetchResponse(this.tp1, this.nextRecords, Errors.NONE, i, throttleTimeMs);
            buffer = response.serialize(ApiKeys.FETCH.latestVersion(), new ResponseHeader(request.correlationId()));
            selector.completeReceive(new NetworkReceive(node.idString(), buffer));
            client.poll(1L, this.time.milliseconds());
            selector.clear();
        }
        Map allMetrics = this.metrics.metrics();
        KafkaMetric avgMetric = (KafkaMetric)allMetrics.get(this.metrics.metricInstance(this.metricsRegistry.fetchThrottleTimeAvg, new String[0]));
        KafkaMetric maxMetric = (KafkaMetric)allMetrics.get(this.metrics.metricInstance(this.metricsRegistry.fetchThrottleTimeMax, new String[0]));
        Assert.assertEquals((double)250.0, (double)avgMetric.value(), (double)1.0E-4);
        Assert.assertEquals((double)400.0, (double)maxMetric.value(), (double)1.0E-4);
        client.close();
    }

    @Test
    public void testFetcherMetrics() {
        this.subscriptions.assignFromUser(Collections.singleton(this.tp1));
        this.subscriptions.seek(this.tp1, 0L);
        MetricName maxLagMetric = this.metrics.metricInstance(this.metricsRegistry.recordsLagMax, new String[0]);
        MetricName partitionLagMetric = this.metrics.metricName(this.tp1 + ".records-lag", this.metricGroup);
        Map allMetrics = this.metrics.metrics();
        KafkaMetric recordsFetchLagMax = (KafkaMetric)allMetrics.get(maxLagMetric);
        Assert.assertEquals((double)Double.NEGATIVE_INFINITY, (double)recordsFetchLagMax.value(), (double)1.0E-4);
        this.fetchRecords(this.tp1, MemoryRecords.EMPTY, Errors.NONE, 100L, 0);
        Assert.assertEquals((double)100.0, (double)recordsFetchLagMax.value(), (double)1.0E-4);
        KafkaMetric partitionLag = (KafkaMetric)allMetrics.get(partitionLagMetric);
        Assert.assertEquals((double)100.0, (double)partitionLag.value(), (double)1.0E-4);
        MemoryRecordsBuilder builder = MemoryRecords.builder((ByteBuffer)ByteBuffer.allocate(1024), (CompressionType)CompressionType.NONE, (TimestampType)TimestampType.CREATE_TIME, (long)0L);
        for (int v = 0; v < 3; ++v) {
            builder.appendWithOffset((long)v, -1L, "key".getBytes(), ("value-" + v).getBytes());
        }
        this.fetchRecords(this.tp1, builder.build(), Errors.NONE, 200L, 0);
        Assert.assertEquals((double)197.0, (double)recordsFetchLagMax.value(), (double)1.0E-4);
        Assert.assertEquals((double)197.0, (double)partitionLag.value(), (double)1.0E-4);
        this.subscriptions.unsubscribe();
        Assert.assertFalse((boolean)allMetrics.containsKey(partitionLagMetric));
    }

    @Test
    public void testReadCommittedLagMetric() {
        Metrics metrics = new Metrics();
        this.fetcher = this.createFetcher(this.subscriptions, metrics, (Deserializer)new ByteArrayDeserializer(), (Deserializer)new ByteArrayDeserializer(), Integer.MAX_VALUE, IsolationLevel.READ_COMMITTED);
        this.subscriptions.assignFromUser(Collections.singleton(this.tp1));
        this.subscriptions.seek(this.tp1, 0L);
        MetricName maxLagMetric = metrics.metricInstance(this.metricsRegistry.recordsLagMax, new String[0]);
        MetricName partitionLagMetric = metrics.metricName(this.tp1 + ".records-lag", this.metricGroup);
        Map allMetrics = metrics.metrics();
        KafkaMetric recordsFetchLagMax = (KafkaMetric)allMetrics.get(maxLagMetric);
        Assert.assertEquals((double)Double.NEGATIVE_INFINITY, (double)recordsFetchLagMax.value(), (double)1.0E-4);
        this.fetchRecords(this.tp1, MemoryRecords.EMPTY, Errors.NONE, 100L, 50L, 0);
        Assert.assertEquals((double)50.0, (double)recordsFetchLagMax.value(), (double)1.0E-4);
        KafkaMetric partitionLag = (KafkaMetric)allMetrics.get(partitionLagMetric);
        Assert.assertEquals((double)50.0, (double)partitionLag.value(), (double)1.0E-4);
        MemoryRecordsBuilder builder = MemoryRecords.builder((ByteBuffer)ByteBuffer.allocate(1024), (CompressionType)CompressionType.NONE, (TimestampType)TimestampType.CREATE_TIME, (long)0L);
        for (int v = 0; v < 3; ++v) {
            builder.appendWithOffset((long)v, -1L, "key".getBytes(), ("value-" + v).getBytes());
        }
        this.fetchRecords(this.tp1, builder.build(), Errors.NONE, 200L, 150L, 0);
        Assert.assertEquals((double)147.0, (double)recordsFetchLagMax.value(), (double)1.0E-4);
        Assert.assertEquals((double)147.0, (double)partitionLag.value(), (double)1.0E-4);
        this.subscriptions.unsubscribe();
        Assert.assertFalse((boolean)allMetrics.containsKey(partitionLagMetric));
    }

    @Test
    public void testFetchResponseMetrics() {
        this.subscriptions.assignFromUser(Collections.singleton(this.tp1));
        this.subscriptions.seek(this.tp1, 0L);
        Map allMetrics = this.metrics.metrics();
        KafkaMetric fetchSizeAverage = (KafkaMetric)allMetrics.get(this.metrics.metricInstance(this.metricsRegistry.fetchSizeAvg, new String[0]));
        KafkaMetric recordsCountAverage = (KafkaMetric)allMetrics.get(this.metrics.metricInstance(this.metricsRegistry.recordsPerRequestAvg, new String[0]));
        MemoryRecordsBuilder builder = MemoryRecords.builder((ByteBuffer)ByteBuffer.allocate(1024), (CompressionType)CompressionType.NONE, (TimestampType)TimestampType.CREATE_TIME, (long)0L);
        for (int v = 0; v < 3; ++v) {
            builder.appendWithOffset((long)v, -1L, "key".getBytes(), ("value-" + v).getBytes());
        }
        MemoryRecords records = builder.build();
        int expectedBytes = 0;
        for (Record record : records.records()) {
            expectedBytes += record.sizeInBytes();
        }
        this.fetchRecords(this.tp1, records, Errors.NONE, 100L, 0);
        Assert.assertEquals((double)expectedBytes, (double)fetchSizeAverage.value(), (double)1.0E-4);
        Assert.assertEquals((double)3.0, (double)recordsCountAverage.value(), (double)1.0E-4);
    }

    @Test
    public void testFetchResponseMetricsPartialResponse() {
        this.subscriptions.assignFromUser(Collections.singleton(this.tp1));
        this.subscriptions.seek(this.tp1, 1L);
        Map allMetrics = this.metrics.metrics();
        KafkaMetric fetchSizeAverage = (KafkaMetric)allMetrics.get(this.metrics.metricInstance(this.metricsRegistry.fetchSizeAvg, new String[0]));
        KafkaMetric recordsCountAverage = (KafkaMetric)allMetrics.get(this.metrics.metricInstance(this.metricsRegistry.recordsPerRequestAvg, new String[0]));
        MemoryRecordsBuilder builder = MemoryRecords.builder((ByteBuffer)ByteBuffer.allocate(1024), (CompressionType)CompressionType.NONE, (TimestampType)TimestampType.CREATE_TIME, (long)0L);
        for (int v = 0; v < 3; ++v) {
            builder.appendWithOffset((long)v, -1L, "key".getBytes(), ("value-" + v).getBytes());
        }
        MemoryRecords records = builder.build();
        int expectedBytes = 0;
        for (Record record : records.records()) {
            if (record.offset() < 1L) continue;
            expectedBytes += record.sizeInBytes();
        }
        this.fetchRecords(this.tp1, records, Errors.NONE, 100L, 0);
        Assert.assertEquals((double)expectedBytes, (double)fetchSizeAverage.value(), (double)1.0E-4);
        Assert.assertEquals((double)2.0, (double)recordsCountAverage.value(), (double)1.0E-4);
    }

    @Test
    public void testFetchResponseMetricsWithOnePartitionError() {
        this.subscriptions.assignFromUser(Utils.mkSet((Object[])new TopicPartition[]{this.tp1, this.tp2}));
        this.subscriptions.seek(this.tp1, 0L);
        this.subscriptions.seek(this.tp2, 0L);
        Map allMetrics = this.metrics.metrics();
        KafkaMetric fetchSizeAverage = (KafkaMetric)allMetrics.get(this.metrics.metricInstance(this.metricsRegistry.fetchSizeAvg, new String[0]));
        KafkaMetric recordsCountAverage = (KafkaMetric)allMetrics.get(this.metrics.metricInstance(this.metricsRegistry.recordsPerRequestAvg, new String[0]));
        MemoryRecordsBuilder builder = MemoryRecords.builder((ByteBuffer)ByteBuffer.allocate(1024), (CompressionType)CompressionType.NONE, (TimestampType)TimestampType.CREATE_TIME, (long)0L);
        for (int v = 0; v < 3; ++v) {
            builder.appendWithOffset((long)v, -1L, "key".getBytes(), ("value-" + v).getBytes());
        }
        MemoryRecords records = builder.build();
        HashMap<TopicPartition, FetchResponse.PartitionData> partitions = new HashMap<TopicPartition, FetchResponse.PartitionData>();
        partitions.put(this.tp1, new FetchResponse.PartitionData(Errors.NONE, 100L, -1L, 0L, null, (Records)records));
        partitions.put(this.tp2, new FetchResponse.PartitionData(Errors.OFFSET_OUT_OF_RANGE, 100L, -1L, 0L, null, (Records)MemoryRecords.EMPTY));
        Assert.assertEquals((long)1L, (long)this.fetcher.sendFetches());
        this.client.prepareResponse((AbstractResponse)new FetchResponse(new LinkedHashMap(partitions), 0));
        this.consumerClient.poll(0L);
        this.fetcher.fetchedRecords();
        int expectedBytes = 0;
        for (Record record : records.records()) {
            expectedBytes += record.sizeInBytes();
        }
        Assert.assertEquals((double)expectedBytes, (double)fetchSizeAverage.value(), (double)1.0E-4);
        Assert.assertEquals((double)3.0, (double)recordsCountAverage.value(), (double)1.0E-4);
    }

    @Test
    public void testFetchResponseMetricsWithOnePartitionAtTheWrongOffset() {
        this.subscriptions.assignFromUser(Utils.mkSet((Object[])new TopicPartition[]{this.tp1, this.tp2}));
        this.subscriptions.seek(this.tp1, 0L);
        this.subscriptions.seek(this.tp2, 0L);
        Map allMetrics = this.metrics.metrics();
        KafkaMetric fetchSizeAverage = (KafkaMetric)allMetrics.get(this.metrics.metricInstance(this.metricsRegistry.fetchSizeAvg, new String[0]));
        KafkaMetric recordsCountAverage = (KafkaMetric)allMetrics.get(this.metrics.metricInstance(this.metricsRegistry.recordsPerRequestAvg, new String[0]));
        Assert.assertEquals((long)1L, (long)this.fetcher.sendFetches());
        this.subscriptions.seek(this.tp2, 5L);
        MemoryRecordsBuilder builder = MemoryRecords.builder((ByteBuffer)ByteBuffer.allocate(1024), (CompressionType)CompressionType.NONE, (TimestampType)TimestampType.CREATE_TIME, (long)0L);
        for (int v = 0; v < 3; ++v) {
            builder.appendWithOffset((long)v, -1L, "key".getBytes(), ("value-" + v).getBytes());
        }
        MemoryRecords records = builder.build();
        HashMap<TopicPartition, FetchResponse.PartitionData> partitions = new HashMap<TopicPartition, FetchResponse.PartitionData>();
        partitions.put(this.tp1, new FetchResponse.PartitionData(Errors.NONE, 100L, -1L, 0L, null, (Records)records));
        partitions.put(this.tp2, new FetchResponse.PartitionData(Errors.NONE, 100L, -1L, 0L, null, (Records)MemoryRecords.withRecords((CompressionType)CompressionType.NONE, (SimpleRecord[])new SimpleRecord[]{new SimpleRecord("val".getBytes())})));
        this.client.prepareResponse((AbstractResponse)new FetchResponse(new LinkedHashMap(partitions), 0));
        this.consumerClient.poll(0L);
        this.fetcher.fetchedRecords();
        int expectedBytes = 0;
        for (Record record : records.records()) {
            expectedBytes += record.sizeInBytes();
        }
        Assert.assertEquals((double)expectedBytes, (double)fetchSizeAverage.value(), (double)1.0E-4);
        Assert.assertEquals((double)3.0, (double)recordsCountAverage.value(), (double)1.0E-4);
    }

    private Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> fetchRecords(TopicPartition tp, MemoryRecords records, Errors error, long hw, int throttleTime) {
        return this.fetchRecords(tp, records, error, hw, -1L, throttleTime);
    }

    private Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> fetchRecords(TopicPartition tp, MemoryRecords records, Errors error, long hw, long lastStableOffset, int throttleTime) {
        Assert.assertEquals((long)1L, (long)this.fetcher.sendFetches());
        this.client.prepareResponse((AbstractResponse)this.fetchResponse(tp, records, error, hw, lastStableOffset, throttleTime));
        this.consumerClient.poll(0L);
        return this.fetcher.fetchedRecords();
    }

    @Test
    public void testGetOffsetsForTimesTimeout() {
        try {
            this.fetcher.getOffsetsByTimes(Collections.singletonMap(new TopicPartition(this.topicName, 2), 1000L), 100L);
            Assert.fail((String)"Should throw timeout exception.");
        }
        catch (TimeoutException timeoutException) {
            // empty catch block
        }
    }

    @Test
    public void testGetOffsetsForTimes() {
        Assert.assertTrue((boolean)this.fetcher.getOffsetsByTimes(new HashMap(), 100L).isEmpty());
        this.testGetOffsetsForTimesWithError(Errors.NONE, Errors.NONE, -1L, 100L, null, 100L);
        this.testGetOffsetsForTimesWithError(Errors.NONE, Errors.NONE, 10L, 100L, 10L, 100L);
        this.testGetOffsetsForTimesWithError(Errors.NOT_LEADER_FOR_PARTITION, Errors.INVALID_REQUEST, 10L, 100L, 10L, 100L);
        this.testGetOffsetsForTimesWithError(Errors.NONE, Errors.NOT_LEADER_FOR_PARTITION, 10L, 100L, 10L, 100L);
        this.testGetOffsetsForTimesWithError(Errors.NOT_LEADER_FOR_PARTITION, Errors.NONE, 10L, 100L, 10L, 100L);
        this.testGetOffsetsForTimesWithError(Errors.UNKNOWN_TOPIC_OR_PARTITION, Errors.NONE, 10L, 100L, 10L, 100L);
        this.testGetOffsetsForTimesWithError(Errors.UNSUPPORTED_FOR_MESSAGE_FORMAT, Errors.NONE, 10L, 100L, null, 100L);
        this.testGetOffsetsForTimesWithError(Errors.BROKER_NOT_AVAILABLE, Errors.NONE, 10L, 100L, 10L, 100L);
    }

    @Test(expected=TimeoutException.class)
    public void testBatchedListOffsetsMetadataErrors() {
        HashMap<TopicPartition, ListOffsetResponse.PartitionData> partitionData = new HashMap<TopicPartition, ListOffsetResponse.PartitionData>();
        partitionData.put(this.tp1, new ListOffsetResponse.PartitionData(Errors.NOT_LEADER_FOR_PARTITION, -1L, -1L));
        partitionData.put(this.tp2, new ListOffsetResponse.PartitionData(Errors.UNKNOWN_TOPIC_OR_PARTITION, -1L, -1L));
        this.client.prepareResponse((AbstractResponse)new ListOffsetResponse(0, partitionData));
        HashMap<TopicPartition, Long> offsetsToSearch = new HashMap<TopicPartition, Long>();
        offsetsToSearch.put(this.tp1, -2L);
        offsetsToSearch.put(this.tp2, -2L);
        this.fetcher.getOffsetsByTimes(offsetsToSearch, 0L);
    }

    @Test
    public void testSkippingAbortedTransactions() {
        Fetcher fetcher = this.createFetcher(this.subscriptions, new Metrics(), (Deserializer)new ByteArrayDeserializer(), (Deserializer)new ByteArrayDeserializer(), Integer.MAX_VALUE, IsolationLevel.READ_COMMITTED);
        ByteBuffer buffer = ByteBuffer.allocate(1024);
        int currentOffset = 0;
        currentOffset += this.appendTransactionalRecords(buffer, 1L, (long)currentOffset, new SimpleRecord(this.time.milliseconds(), "key".getBytes(), "value".getBytes()), new SimpleRecord(this.time.milliseconds(), "key".getBytes(), "value".getBytes()));
        this.abortTransaction(buffer, 1L, currentOffset);
        buffer.flip();
        ArrayList<FetchResponse.AbortedTransaction> abortedTransactions = new ArrayList<FetchResponse.AbortedTransaction>();
        abortedTransactions.add(new FetchResponse.AbortedTransaction(1L, 0L));
        MemoryRecords records = MemoryRecords.readableRecords((ByteBuffer)buffer);
        this.subscriptions.assignFromUser(Collections.singleton(this.tp1));
        this.subscriptions.seek(this.tp1, 0L);
        Assert.assertEquals((long)1L, (long)fetcher.sendFetches());
        Assert.assertFalse((boolean)fetcher.hasCompletedFetches());
        this.client.prepareResponse((AbstractResponse)this.fetchResponseWithAbortedTransactions(records, abortedTransactions, Errors.NONE, 100L, 100L, 0));
        this.consumerClient.poll(0L);
        Assert.assertTrue((boolean)fetcher.hasCompletedFetches());
        Map fetchedRecords = fetcher.fetchedRecords();
        Assert.assertFalse((boolean)fetchedRecords.containsKey(this.tp1));
    }

    @Test
    public void testReturnCommittedTransactions() {
        Fetcher fetcher = this.createFetcher(this.subscriptions, new Metrics(), (Deserializer)new ByteArrayDeserializer(), (Deserializer)new ByteArrayDeserializer(), Integer.MAX_VALUE, IsolationLevel.READ_COMMITTED);
        ByteBuffer buffer = ByteBuffer.allocate(1024);
        int currentOffset = 0;
        currentOffset += this.appendTransactionalRecords(buffer, 1L, (long)currentOffset, new SimpleRecord(this.time.milliseconds(), "key".getBytes(), "value".getBytes()), new SimpleRecord(this.time.milliseconds(), "key".getBytes(), "value".getBytes()));
        currentOffset += this.commitTransaction(buffer, 1L, currentOffset);
        buffer.flip();
        ArrayList<FetchResponse.AbortedTransaction> abortedTransactions = new ArrayList<FetchResponse.AbortedTransaction>();
        MemoryRecords records = MemoryRecords.readableRecords((ByteBuffer)buffer);
        this.subscriptions.assignFromUser(Collections.singleton(this.tp1));
        this.subscriptions.seek(this.tp1, 0L);
        Assert.assertEquals((long)1L, (long)fetcher.sendFetches());
        Assert.assertFalse((boolean)fetcher.hasCompletedFetches());
        this.client.prepareResponse(new MockClient.RequestMatcher(){

            @Override
            public boolean matches(AbstractRequest body) {
                FetchRequest request = (FetchRequest)body;
                Assert.assertEquals((Object)IsolationLevel.READ_COMMITTED, (Object)request.isolationLevel());
                return true;
            }
        }, (AbstractResponse)this.fetchResponseWithAbortedTransactions(records, abortedTransactions, Errors.NONE, 100L, 100L, 0));
        this.consumerClient.poll(0L);
        Assert.assertTrue((boolean)fetcher.hasCompletedFetches());
        Map fetchedRecords = fetcher.fetchedRecords();
        Assert.assertTrue((boolean)fetchedRecords.containsKey(this.tp1));
        Assert.assertEquals((long)((List)fetchedRecords.get(this.tp1)).size(), (long)2L);
    }

    @Test
    public void testReadCommittedWithCommittedAndAbortedTransactions() {
        Fetcher fetcher = this.createFetcher(this.subscriptions, new Metrics(), (Deserializer)new ByteArrayDeserializer(), (Deserializer)new ByteArrayDeserializer(), Integer.MAX_VALUE, IsolationLevel.READ_COMMITTED);
        ByteBuffer buffer = ByteBuffer.allocate(1024);
        ArrayList<FetchResponse.AbortedTransaction> abortedTransactions = new ArrayList<FetchResponse.AbortedTransaction>();
        long pid1 = 1L;
        long pid2 = 2L;
        this.appendTransactionalRecords(buffer, pid1, 0L, new SimpleRecord("commit1-1".getBytes(), "value".getBytes()), new SimpleRecord("commit1-2".getBytes(), "value".getBytes()));
        this.appendTransactionalRecords(buffer, pid2, 2L, new SimpleRecord("abort2-1".getBytes(), "value".getBytes()));
        this.commitTransaction(buffer, pid1, 3L);
        this.appendTransactionalRecords(buffer, pid2, 4L, new SimpleRecord("abort2-2".getBytes(), "value".getBytes()));
        this.abortTransaction(buffer, pid2, 5L);
        abortedTransactions.add(new FetchResponse.AbortedTransaction(pid2, 2L));
        this.appendTransactionalRecords(buffer, pid1, 6L, new SimpleRecord("abort1-1".getBytes(), "value".getBytes()));
        this.appendTransactionalRecords(buffer, pid2, 7L, new SimpleRecord("commit2-1".getBytes(), "value".getBytes()));
        this.appendTransactionalRecords(buffer, pid1, 8L, new SimpleRecord("abort1-2".getBytes(), "value".getBytes()));
        this.abortTransaction(buffer, pid1, 9L);
        abortedTransactions.add(new FetchResponse.AbortedTransaction(1L, 6L));
        this.commitTransaction(buffer, pid2, 10L);
        buffer.flip();
        MemoryRecords records = MemoryRecords.readableRecords((ByteBuffer)buffer);
        this.subscriptions.assignFromUser(Collections.singleton(this.tp1));
        this.subscriptions.seek(this.tp1, 0L);
        Assert.assertEquals((long)1L, (long)fetcher.sendFetches());
        Assert.assertFalse((boolean)fetcher.hasCompletedFetches());
        this.client.prepareResponse((AbstractResponse)this.fetchResponseWithAbortedTransactions(records, abortedTransactions, Errors.NONE, 100L, 100L, 0));
        this.consumerClient.poll(0L);
        Assert.assertTrue((boolean)fetcher.hasCompletedFetches());
        Map fetchedRecords = fetcher.fetchedRecords();
        Assert.assertTrue((boolean)fetchedRecords.containsKey(this.tp1));
        List fetchedConsumerRecords = (List)fetchedRecords.get(this.tp1);
        HashSet<String> fetchedKeys = new HashSet<String>();
        for (ConsumerRecord consumerRecord : fetchedConsumerRecords) {
            fetchedKeys.add(new String((byte[])consumerRecord.key(), StandardCharsets.UTF_8));
        }
        Assert.assertEquals((Object)Utils.mkSet((Object[])new String[]{"commit1-1", "commit1-2", "commit2-1"}), fetchedKeys);
    }

    @Test
    public void testMultipleAbortMarkers() {
        Fetcher fetcher = this.createFetcher(this.subscriptions, new Metrics(), (Deserializer)new ByteArrayDeserializer(), (Deserializer)new ByteArrayDeserializer(), Integer.MAX_VALUE, IsolationLevel.READ_COMMITTED);
        ByteBuffer buffer = ByteBuffer.allocate(1024);
        int currentOffset = 0;
        currentOffset += this.appendTransactionalRecords(buffer, 1L, (long)currentOffset, new SimpleRecord(this.time.milliseconds(), "abort1-1".getBytes(), "value".getBytes()), new SimpleRecord(this.time.milliseconds(), "abort1-2".getBytes(), "value".getBytes()));
        currentOffset += this.abortTransaction(buffer, 1L, currentOffset);
        currentOffset += this.abortTransaction(buffer, 1L, currentOffset);
        currentOffset += this.appendTransactionalRecords(buffer, 1L, (long)currentOffset, new SimpleRecord(this.time.milliseconds(), "commit1-1".getBytes(), "value".getBytes()), new SimpleRecord(this.time.milliseconds(), "commit1-2".getBytes(), "value".getBytes()));
        this.commitTransaction(buffer, 1L, currentOffset);
        buffer.flip();
        ArrayList<FetchResponse.AbortedTransaction> abortedTransactions = new ArrayList<FetchResponse.AbortedTransaction>();
        abortedTransactions.add(new FetchResponse.AbortedTransaction(1L, 0L));
        MemoryRecords records = MemoryRecords.readableRecords((ByteBuffer)buffer);
        this.subscriptions.assignFromUser(Collections.singleton(this.tp1));
        this.subscriptions.seek(this.tp1, 0L);
        Assert.assertEquals((long)1L, (long)fetcher.sendFetches());
        Assert.assertFalse((boolean)fetcher.hasCompletedFetches());
        this.client.prepareResponse((AbstractResponse)this.fetchResponseWithAbortedTransactions(records, abortedTransactions, Errors.NONE, 100L, 100L, 0));
        this.consumerClient.poll(0L);
        Assert.assertTrue((boolean)fetcher.hasCompletedFetches());
        Map fetchedRecords = fetcher.fetchedRecords();
        Assert.assertTrue((boolean)fetchedRecords.containsKey(this.tp1));
        Assert.assertEquals((long)((List)fetchedRecords.get(this.tp1)).size(), (long)2L);
        List fetchedConsumerRecords = (List)fetchedRecords.get(this.tp1);
        HashSet<String> committedKeys = new HashSet<String>(Arrays.asList("commit1-1", "commit1-2"));
        HashSet<String> actuallyCommittedKeys = new HashSet<String>();
        for (ConsumerRecord consumerRecord : fetchedConsumerRecords) {
            actuallyCommittedKeys.add(new String((byte[])consumerRecord.key(), StandardCharsets.UTF_8));
        }
        Assert.assertTrue((boolean)actuallyCommittedKeys.equals(committedKeys));
    }

    @Test
    public void testReadCommittedAbortMarkerWithNoData() {
        Fetcher fetcher = this.createFetcher(this.subscriptions, new Metrics(), (Deserializer)new StringDeserializer(), (Deserializer)new StringDeserializer(), Integer.MAX_VALUE, IsolationLevel.READ_COMMITTED);
        ByteBuffer buffer = ByteBuffer.allocate(1024);
        long producerId = 1L;
        this.abortTransaction(buffer, producerId, 5L);
        this.appendTransactionalRecords(buffer, producerId, 6L, new SimpleRecord("6".getBytes(), null), new SimpleRecord("7".getBytes(), null), new SimpleRecord("8".getBytes(), null));
        this.commitTransaction(buffer, producerId, 9L);
        buffer.flip();
        this.subscriptions.assignFromUser(Collections.singleton(this.tp1));
        this.subscriptions.seek(this.tp1, 0L);
        Assert.assertEquals((long)1L, (long)fetcher.sendFetches());
        ArrayList<FetchResponse.AbortedTransaction> abortedTransactions = new ArrayList<FetchResponse.AbortedTransaction>();
        abortedTransactions.add(new FetchResponse.AbortedTransaction(producerId, 0L));
        this.client.prepareResponse((AbstractResponse)this.fetchResponseWithAbortedTransactions(MemoryRecords.readableRecords((ByteBuffer)buffer), abortedTransactions, Errors.NONE, 100L, 100L, 0));
        this.consumerClient.poll(0L);
        Assert.assertTrue((boolean)fetcher.hasCompletedFetches());
        Map allFetchedRecords = fetcher.fetchedRecords();
        Assert.assertTrue((boolean)allFetchedRecords.containsKey(this.tp1));
        List fetchedRecords = (List)allFetchedRecords.get(this.tp1);
        Assert.assertEquals((long)3L, (long)fetchedRecords.size());
        Assert.assertEquals(Arrays.asList(6L, 7L, 8L), this.collectRecordOffsets(fetchedRecords));
    }

    @Test
    public void testUpdatePositionWithLastRecordMissingFromBatch() {
        MemoryRecords records = MemoryRecords.withRecords((CompressionType)CompressionType.NONE, (SimpleRecord[])new SimpleRecord[]{new SimpleRecord("0".getBytes(), "v".getBytes()), new SimpleRecord("1".getBytes(), "v".getBytes()), new SimpleRecord("2".getBytes(), "v".getBytes()), new SimpleRecord(null, "value".getBytes())});
        MemoryRecords.FilterResult result = records.filterTo(this.tp1, new MemoryRecords.RecordFilter(){

            protected MemoryRecords.RecordFilter.BatchRetention checkBatchRetention(RecordBatch batch) {
                return MemoryRecords.RecordFilter.BatchRetention.DELETE_EMPTY;
            }

            protected boolean shouldRetainRecord(RecordBatch recordBatch, Record record) {
                return record.key() != null;
            }
        }, ByteBuffer.allocate(1024), Integer.MAX_VALUE, BufferSupplier.NO_CACHING);
        result.output.flip();
        MemoryRecords compactedRecords = MemoryRecords.readableRecords((ByteBuffer)result.output);
        this.subscriptions.assignFromUser(Collections.singleton(this.tp1));
        this.subscriptions.seek(this.tp1, 0L);
        Assert.assertEquals((long)1L, (long)this.fetcher.sendFetches());
        this.client.prepareResponse((AbstractResponse)this.fetchResponse(this.tp1, compactedRecords, Errors.NONE, 100L, 0));
        this.consumerClient.poll(0L);
        Assert.assertTrue((boolean)this.fetcher.hasCompletedFetches());
        Map allFetchedRecords = this.fetcher.fetchedRecords();
        Assert.assertTrue((boolean)allFetchedRecords.containsKey(this.tp1));
        List fetchedRecords = (List)allFetchedRecords.get(this.tp1);
        Assert.assertEquals((long)3L, (long)fetchedRecords.size());
        for (int i = 0; i < 3; ++i) {
            Assert.assertEquals((Object)Integer.toString(i), (Object)new String((byte[])((ConsumerRecord)fetchedRecords.get(i)).key()));
        }
        Assert.assertEquals((long)4L, (long)this.subscriptions.position(this.tp1));
    }

    @Test
    public void testUpdatePositionOnEmptyBatch() {
        long producerId = 1L;
        short producerEpoch = 0;
        int sequence = 1;
        long baseOffset = 37L;
        long lastOffset = 54L;
        int partitionLeaderEpoch = 7;
        ByteBuffer buffer = ByteBuffer.allocate(61);
        DefaultRecordBatch.writeEmptyHeader((ByteBuffer)buffer, (byte)2, (long)producerId, (short)producerEpoch, (int)sequence, (long)baseOffset, (long)lastOffset, (int)partitionLeaderEpoch, (TimestampType)TimestampType.CREATE_TIME, (long)System.currentTimeMillis(), (boolean)false, (boolean)false);
        buffer.flip();
        MemoryRecords recordsWithEmptyBatch = MemoryRecords.readableRecords((ByteBuffer)buffer);
        this.subscriptions.assignFromUser(Collections.singleton(this.tp1));
        this.subscriptions.seek(this.tp1, 0L);
        Assert.assertEquals((long)1L, (long)this.fetcher.sendFetches());
        this.client.prepareResponse((AbstractResponse)this.fetchResponse(this.tp1, recordsWithEmptyBatch, Errors.NONE, 100L, 0));
        this.consumerClient.poll(0L);
        Assert.assertTrue((boolean)this.fetcher.hasCompletedFetches());
        Map allFetchedRecords = this.fetcher.fetchedRecords();
        Assert.assertTrue((boolean)allFetchedRecords.isEmpty());
        Assert.assertEquals((long)(lastOffset + 1L), (long)this.subscriptions.position(this.tp1));
    }

    @Test
    public void testReadCommittedWithCompactedTopic() {
        Fetcher fetcher = this.createFetcher(this.subscriptions, new Metrics(), (Deserializer)new StringDeserializer(), (Deserializer)new StringDeserializer(), Integer.MAX_VALUE, IsolationLevel.READ_COMMITTED);
        ByteBuffer buffer = ByteBuffer.allocate(1024);
        long pid1 = 1L;
        long pid2 = 2L;
        long pid3 = 3L;
        this.appendTransactionalRecords(buffer, pid3, 3L, new SimpleRecord("3".getBytes(), "value".getBytes()), new SimpleRecord("4".getBytes(), "value".getBytes()));
        this.appendTransactionalRecords(buffer, pid2, 15L, new SimpleRecord("15".getBytes(), "value".getBytes()), new SimpleRecord("16".getBytes(), "value".getBytes()), new SimpleRecord("17".getBytes(), "value".getBytes()));
        this.appendTransactionalRecords(buffer, pid1, 22L, new SimpleRecord("22".getBytes(), "value".getBytes()), new SimpleRecord("23".getBytes(), "value".getBytes()));
        this.abortTransaction(buffer, pid2, 28L);
        this.appendTransactionalRecords(buffer, pid3, 30L, new SimpleRecord("30".getBytes(), "value".getBytes()), new SimpleRecord("31".getBytes(), "value".getBytes()), new SimpleRecord("32".getBytes(), "value".getBytes()));
        this.commitTransaction(buffer, pid3, 35L);
        this.appendTransactionalRecords(buffer, pid1, 39L, new SimpleRecord("39".getBytes(), "value".getBytes()), new SimpleRecord("40".getBytes(), "value".getBytes()));
        buffer.flip();
        this.subscriptions.assignFromUser(Collections.singleton(this.tp1));
        this.subscriptions.seek(this.tp1, 0L);
        Assert.assertEquals((long)1L, (long)fetcher.sendFetches());
        ArrayList<FetchResponse.AbortedTransaction> abortedTransactions = new ArrayList<FetchResponse.AbortedTransaction>();
        abortedTransactions.add(new FetchResponse.AbortedTransaction(pid2, 6L));
        abortedTransactions.add(new FetchResponse.AbortedTransaction(pid1, 0L));
        this.client.prepareResponse((AbstractResponse)this.fetchResponseWithAbortedTransactions(MemoryRecords.readableRecords((ByteBuffer)buffer), abortedTransactions, Errors.NONE, 100L, 100L, 0));
        this.consumerClient.poll(0L);
        Assert.assertTrue((boolean)fetcher.hasCompletedFetches());
        Map allFetchedRecords = fetcher.fetchedRecords();
        Assert.assertTrue((boolean)allFetchedRecords.containsKey(this.tp1));
        List fetchedRecords = (List)allFetchedRecords.get(this.tp1);
        Assert.assertEquals((long)5L, (long)fetchedRecords.size());
        Assert.assertEquals(Arrays.asList(3L, 4L, 30L, 31L, 32L), this.collectRecordOffsets(fetchedRecords));
    }

    @Test
    public void testReturnAbortedTransactionsinUncommittedMode() {
        Fetcher fetcher = this.createFetcher(this.subscriptions, new Metrics(), (Deserializer)new ByteArrayDeserializer(), (Deserializer)new ByteArrayDeserializer(), Integer.MAX_VALUE, IsolationLevel.READ_UNCOMMITTED);
        ByteBuffer buffer = ByteBuffer.allocate(1024);
        int currentOffset = 0;
        currentOffset += this.appendTransactionalRecords(buffer, 1L, (long)currentOffset, new SimpleRecord(this.time.milliseconds(), "key".getBytes(), "value".getBytes()), new SimpleRecord(this.time.milliseconds(), "key".getBytes(), "value".getBytes()));
        this.abortTransaction(buffer, 1L, currentOffset);
        buffer.flip();
        ArrayList<FetchResponse.AbortedTransaction> abortedTransactions = new ArrayList<FetchResponse.AbortedTransaction>();
        abortedTransactions.add(new FetchResponse.AbortedTransaction(1L, 0L));
        MemoryRecords records = MemoryRecords.readableRecords((ByteBuffer)buffer);
        this.subscriptions.assignFromUser(Collections.singleton(this.tp1));
        this.subscriptions.seek(this.tp1, 0L);
        Assert.assertEquals((long)1L, (long)fetcher.sendFetches());
        Assert.assertFalse((boolean)fetcher.hasCompletedFetches());
        this.client.prepareResponse((AbstractResponse)this.fetchResponseWithAbortedTransactions(records, abortedTransactions, Errors.NONE, 100L, 100L, 0));
        this.consumerClient.poll(0L);
        Assert.assertTrue((boolean)fetcher.hasCompletedFetches());
        Map fetchedRecords = fetcher.fetchedRecords();
        Assert.assertTrue((boolean)fetchedRecords.containsKey(this.tp1));
    }

    @Test
    public void testConsumerPositionUpdatedWhenSkippingAbortedTransactions() {
        Fetcher fetcher = this.createFetcher(this.subscriptions, new Metrics(), (Deserializer)new ByteArrayDeserializer(), (Deserializer)new ByteArrayDeserializer(), Integer.MAX_VALUE, IsolationLevel.READ_COMMITTED);
        ByteBuffer buffer = ByteBuffer.allocate(1024);
        long currentOffset = 0L;
        currentOffset += (long)this.appendTransactionalRecords(buffer, 1L, currentOffset, new SimpleRecord(this.time.milliseconds(), "abort1-1".getBytes(), "value".getBytes()), new SimpleRecord(this.time.milliseconds(), "abort1-2".getBytes(), "value".getBytes()));
        currentOffset += (long)this.abortTransaction(buffer, 1L, currentOffset);
        buffer.flip();
        ArrayList<FetchResponse.AbortedTransaction> abortedTransactions = new ArrayList<FetchResponse.AbortedTransaction>();
        abortedTransactions.add(new FetchResponse.AbortedTransaction(1L, 0L));
        MemoryRecords records = MemoryRecords.readableRecords((ByteBuffer)buffer);
        this.subscriptions.assignFromUser(Collections.singleton(this.tp1));
        this.subscriptions.seek(this.tp1, 0L);
        Assert.assertEquals((long)1L, (long)fetcher.sendFetches());
        Assert.assertFalse((boolean)fetcher.hasCompletedFetches());
        this.client.prepareResponse((AbstractResponse)this.fetchResponseWithAbortedTransactions(records, abortedTransactions, Errors.NONE, 100L, 100L, 0));
        this.consumerClient.poll(0L);
        Assert.assertTrue((boolean)fetcher.hasCompletedFetches());
        Map fetchedRecords = fetcher.fetchedRecords();
        Assert.assertFalse((boolean)fetchedRecords.containsKey(this.tp1));
        Assert.assertEquals((long)currentOffset, (long)this.subscriptions.position(this.tp1));
    }

    private int appendTransactionalRecords(ByteBuffer buffer, long pid, long baseOffset, int baseSequence, SimpleRecord ... records) {
        MemoryRecordsBuilder builder = MemoryRecords.builder((ByteBuffer)buffer, (byte)2, (CompressionType)CompressionType.NONE, (TimestampType)TimestampType.CREATE_TIME, (long)baseOffset, (long)this.time.milliseconds(), (long)pid, (short)0, (int)baseSequence, (boolean)true, (int)-1);
        for (SimpleRecord record : records) {
            builder.append(record);
        }
        builder.build();
        return records.length;
    }

    private int appendTransactionalRecords(ByteBuffer buffer, long pid, long baseOffset, SimpleRecord ... records) {
        return this.appendTransactionalRecords(buffer, pid, baseOffset, (int)baseOffset, records);
    }

    private int commitTransaction(ByteBuffer buffer, long producerId, long baseOffset) {
        short producerEpoch = 0;
        int partitionLeaderEpoch = 0;
        MemoryRecords.writeEndTransactionalMarker((ByteBuffer)buffer, (long)baseOffset, (long)this.time.milliseconds(), (int)partitionLeaderEpoch, (long)producerId, (short)producerEpoch, (EndTransactionMarker)new EndTransactionMarker(ControlRecordType.COMMIT, 0));
        return 1;
    }

    private int abortTransaction(ByteBuffer buffer, long producerId, long baseOffset) {
        short producerEpoch = 0;
        int partitionLeaderEpoch = 0;
        MemoryRecords.writeEndTransactionalMarker((ByteBuffer)buffer, (long)baseOffset, (long)this.time.milliseconds(), (int)partitionLeaderEpoch, (long)producerId, (short)producerEpoch, (EndTransactionMarker)new EndTransactionMarker(ControlRecordType.ABORT, 0));
        return 1;
    }

    private void testGetOffsetsForTimesWithError(Errors errorForTp0, Errors errorForTp1, long offsetForTp0, long offsetForTp1, Long expectedOffsetForTp0, Long expectedOffsetForTp1) {
        this.client.reset();
        TopicPartition tp0 = this.tp1;
        TopicPartition tp1 = new TopicPartition(this.topicName, 1);
        Cluster cluster = TestUtils.clusterWith(2, this.topicName, 2);
        this.metadata.update(cluster, Collections.emptySet(), this.time.milliseconds());
        this.client.prepareResponseFrom((AbstractResponse)this.listOffsetResponse(tp0, errorForTp0, offsetForTp0, offsetForTp0), cluster.leaderFor(tp0));
        this.client.prepareResponseFrom((AbstractResponse)this.listOffsetResponse(tp1, errorForTp1, offsetForTp1, offsetForTp1), cluster.leaderFor(tp1));
        this.client.prepareResponseFrom((AbstractResponse)this.listOffsetResponse(tp0, Errors.NONE, offsetForTp0, offsetForTp0), cluster.leaderFor(tp0));
        this.client.prepareResponseFrom((AbstractResponse)this.listOffsetResponse(tp1, Errors.NONE, offsetForTp1, offsetForTp1), cluster.leaderFor(tp1));
        HashMap<TopicPartition, Long> timestampToSearch = new HashMap<TopicPartition, Long>();
        timestampToSearch.put(tp0, 0L);
        timestampToSearch.put(tp1, 0L);
        Map offsetAndTimestampMap = this.fetcher.getOffsetsByTimes(timestampToSearch, Long.MAX_VALUE);
        if (expectedOffsetForTp0 == null) {
            Assert.assertNull(offsetAndTimestampMap.get(tp0));
        } else {
            Assert.assertEquals((long)expectedOffsetForTp0, (long)((OffsetAndTimestamp)offsetAndTimestampMap.get(tp0)).timestamp());
            Assert.assertEquals((long)expectedOffsetForTp0, (long)((OffsetAndTimestamp)offsetAndTimestampMap.get(tp0)).offset());
        }
        if (expectedOffsetForTp1 == null) {
            Assert.assertNull(offsetAndTimestampMap.get(tp1));
        } else {
            Assert.assertEquals((long)expectedOffsetForTp1, (long)((OffsetAndTimestamp)offsetAndTimestampMap.get(tp1)).timestamp());
            Assert.assertEquals((long)expectedOffsetForTp1, (long)((OffsetAndTimestamp)offsetAndTimestampMap.get(tp1)).offset());
        }
    }

    private MockClient.RequestMatcher listOffsetRequestMatcher(final long timestamp) {
        return new MockClient.RequestMatcher(){

            @Override
            public boolean matches(AbstractRequest body) {
                ListOffsetRequest req = (ListOffsetRequest)body;
                return timestamp == (Long)req.partitionTimestamps().get(FetcherTest.this.tp1);
            }
        };
    }

    private ListOffsetResponse listOffsetResponse(Errors error, long timestamp, long offset) {
        return this.listOffsetResponse(this.tp1, error, timestamp, offset);
    }

    private ListOffsetResponse listOffsetResponse(TopicPartition tp, Errors error, long timestamp, long offset) {
        ListOffsetResponse.PartitionData partitionData = new ListOffsetResponse.PartitionData(error, timestamp, offset);
        HashMap<TopicPartition, ListOffsetResponse.PartitionData> allPartitionData = new HashMap<TopicPartition, ListOffsetResponse.PartitionData>();
        allPartitionData.put(tp, partitionData);
        return new ListOffsetResponse(allPartitionData);
    }

    private FetchResponse fetchResponseWithAbortedTransactions(MemoryRecords records, List<FetchResponse.AbortedTransaction> abortedTransactions, Errors error, long lastStableOffset, long hw, int throttleTime) {
        Map<TopicPartition, FetchResponse.PartitionData> partitions = Collections.singletonMap(this.tp1, new FetchResponse.PartitionData(error, hw, lastStableOffset, 0L, abortedTransactions, (Records)records));
        return new FetchResponse(new LinkedHashMap<TopicPartition, FetchResponse.PartitionData>(partitions), throttleTime);
    }

    private FetchResponse fetchResponse(TopicPartition tp, MemoryRecords records, Errors error, long hw, int throttleTime) {
        return this.fetchResponse(tp, records, error, hw, -1L, throttleTime);
    }

    private FetchResponse fetchResponse(TopicPartition tp, MemoryRecords records, Errors error, long hw, long lastStableOffset, int throttleTime) {
        Map<TopicPartition, FetchResponse.PartitionData> partitions = Collections.singletonMap(tp, new FetchResponse.PartitionData(error, hw, lastStableOffset, 0L, null, (Records)records));
        return new FetchResponse(new LinkedHashMap<TopicPartition, FetchResponse.PartitionData>(partitions), throttleTime);
    }

    private MetadataResponse newMetadataResponse(String topic, Errors error) {
        ArrayList<MetadataResponse.PartitionMetadata> partitionsMetadata = new ArrayList<MetadataResponse.PartitionMetadata>();
        if (error == Errors.NONE) {
            for (PartitionInfo partitionInfo : this.cluster.partitionsForTopic(topic)) {
                partitionsMetadata.add(new MetadataResponse.PartitionMetadata(Errors.NONE, partitionInfo.partition(), partitionInfo.leader(), Arrays.asList(partitionInfo.replicas()), Arrays.asList(partitionInfo.inSyncReplicas())));
            }
        }
        MetadataResponse.TopicMetadata topicMetadata = new MetadataResponse.TopicMetadata(error, topic, false, partitionsMetadata);
        return new MetadataResponse(this.cluster.nodes(), null, -1, Arrays.asList(topicMetadata));
    }

    private Fetcher<byte[], byte[]> createFetcher(SubscriptionState subscriptions, Metrics metrics, int maxPollRecords) {
        return this.createFetcher(subscriptions, metrics, (Deserializer)new ByteArrayDeserializer(), (Deserializer)new ByteArrayDeserializer(), maxPollRecords, IsolationLevel.READ_UNCOMMITTED);
    }

    private Fetcher<byte[], byte[]> createFetcher(SubscriptionState subscriptions, Metrics metrics) {
        return this.createFetcher(subscriptions, metrics, Integer.MAX_VALUE);
    }

    private <K, V> Fetcher<K, V> createFetcher(SubscriptionState subscriptions, Metrics metrics, Deserializer<K> keyDeserializer, Deserializer<V> valueDeserializer) {
        return this.createFetcher(subscriptions, metrics, keyDeserializer, valueDeserializer, Integer.MAX_VALUE, IsolationLevel.READ_UNCOMMITTED);
    }

    private <K, V> Fetcher<K, V> createFetcher(SubscriptionState subscriptions, Metrics metrics, Deserializer<K> keyDeserializer, Deserializer<V> valueDeserializer, int maxPollRecords, IsolationLevel isolationLevel) {
        return new Fetcher(this.consumerClient, this.minBytes, this.maxBytes, this.maxWaitMs, this.fetchSize, maxPollRecords, true, keyDeserializer, valueDeserializer, this.metadata, subscriptions, metrics, this.metricsRegistry, (Time)this.time, this.retryBackoffMs, isolationLevel);
    }

    private <T> List<Long> collectRecordOffsets(List<ConsumerRecord<T, T>> records) {
        ArrayList<Long> res = new ArrayList<Long>(records.size());
        for (ConsumerRecord<T, T> record : records) {
            res.add(record.offset());
        }
        return res;
    }
}

