/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.clients.consumer.internals;

import java.time.Duration;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;
import org.apache.kafka.clients.ApiVersions;
import org.apache.kafka.clients.ClientDnsLookup;
import org.apache.kafka.clients.ClientUtils;
import org.apache.kafka.clients.KafkaClient;
import org.apache.kafka.clients.Metadata;
import org.apache.kafka.clients.MockClient;
import org.apache.kafka.clients.NodeApiVersions;
import org.apache.kafka.clients.consumer.LogTruncationException;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.consumer.OffsetAndTimestamp;
import org.apache.kafka.clients.consumer.internals.AutoOffsetResetStrategy;
import org.apache.kafka.clients.consumer.internals.ConsumerMetadata;
import org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient;
import org.apache.kafka.clients.consumer.internals.Deserializers;
import org.apache.kafka.clients.consumer.internals.FetchConfig;
import org.apache.kafka.clients.consumer.internals.FetchMetricsManager;
import org.apache.kafka.clients.consumer.internals.FetchMetricsRegistry;
import org.apache.kafka.clients.consumer.internals.Fetcher;
import org.apache.kafka.clients.consumer.internals.OffsetFetcher;
import org.apache.kafka.clients.consumer.internals.SubscriptionState;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.IsolationLevel;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.errors.TopicAuthorizationException;
import org.apache.kafka.common.internals.ClusterResourceListeners;
import org.apache.kafka.common.message.ListOffsetsRequestData;
import org.apache.kafka.common.message.ListOffsetsResponseData;
import org.apache.kafka.common.message.OffsetForLeaderEpochRequestData;
import org.apache.kafka.common.message.OffsetForLeaderEpochResponseData;
import org.apache.kafka.common.metrics.MetricConfig;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.requests.AbstractRequest;
import org.apache.kafka.common.requests.AbstractResponse;
import org.apache.kafka.common.requests.ListOffsetsRequest;
import org.apache.kafka.common.requests.ListOffsetsResponse;
import org.apache.kafka.common.requests.MetadataResponse;
import org.apache.kafka.common.requests.OffsetsForLeaderEpochRequest;
import org.apache.kafka.common.requests.OffsetsForLeaderEpochResponse;
import org.apache.kafka.common.requests.RequestTestUtils;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.utils.ImplicitLinkedHashCollection;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.test.TestUtils;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.mockito.Mockito;

public class OffsetFetcherTest {
    private final String topicName = "test";
    private final Uuid topicId = Uuid.randomUuid();
    private final Map<String, Uuid> topicIds = new HashMap<String, Uuid>(Map.of("test", this.topicId));
    private final TopicPartition tp0 = new TopicPartition("test", 0);
    private final TopicPartition tp1 = new TopicPartition("test", 1);
    private final TopicPartition tp2 = new TopicPartition("test", 2);
    private final TopicPartition tp3 = new TopicPartition("test", 3);
    private final int validLeaderEpoch = 0;
    private final MetadataResponse initialUpdateResponse = RequestTestUtils.metadataUpdateWithIds(1, Collections.singletonMap("test", 4), this.topicIds);
    private final int requestTimeoutMs = 30000;
    private final long retryBackoffMs = 100L;
    private MockTime time = new MockTime(1L);
    private SubscriptionState subscriptions;
    private ConsumerMetadata metadata;
    private MockClient client;
    private Metrics metrics;
    private final ApiVersions apiVersions = new ApiVersions();
    private ConsumerNetworkClient consumerClient;
    private OffsetFetcher offsetFetcher;

    @BeforeEach
    public void setup() {
    }

    private void assignFromUser(Set<TopicPartition> partitions) {
        this.subscriptions.assignFromUser(partitions);
        this.client.updateMetadata(this.initialUpdateResponse);
        this.metadata.updateWithCurrentRequestVersion(RequestTestUtils.metadataUpdateWithIds("dummy", 1, Collections.emptyMap(), Collections.singletonMap("test", 4), tp -> 0, this.topicIds), false, 0L);
    }

    @AfterEach
    public void teardown() throws Exception {
        if (this.metrics != null) {
            this.metrics.close();
        }
    }

    @Test
    public void testUpdateFetchPositionNoOpWithPositionSet() {
        this.buildFetcher();
        this.assignFromUser(Collections.singleton(this.tp0));
        this.subscriptions.seek(this.tp0, 5L);
        this.offsetFetcher.resetPositionsIfNeeded();
        Assertions.assertFalse((boolean)this.client.hasInFlightRequests());
        Assertions.assertTrue((boolean)this.subscriptions.isFetchable(this.tp0));
        Assertions.assertEquals((long)5L, (long)this.subscriptions.position((TopicPartition)this.tp0).offset);
    }

    @Test
    public void testUpdateFetchPositionResetToDefaultOffset() {
        this.buildFetcher();
        this.assignFromUser(Collections.singleton(this.tp0));
        this.subscriptions.requestOffsetReset(this.tp0);
        this.client.prepareResponse(this.listOffsetRequestMatcher(-2L, 0), (AbstractResponse)this.listOffsetResponse(Errors.NONE, 1L, 5L));
        this.offsetFetcher.resetPositionsIfNeeded();
        this.consumerClient.pollNoWakeup();
        Assertions.assertFalse((boolean)this.subscriptions.isOffsetResetNeeded(this.tp0));
        Assertions.assertTrue((boolean)this.subscriptions.isFetchable(this.tp0));
        Assertions.assertEquals((long)5L, (long)this.subscriptions.position((TopicPartition)this.tp0).offset);
    }

    @Test
    public void testUpdateFetchPositionResetToLatestOffset() {
        this.buildFetcher();
        this.assignFromUser(Collections.singleton(this.tp0));
        this.subscriptions.requestOffsetReset(this.tp0, AutoOffsetResetStrategy.LATEST);
        this.client.updateMetadata(this.initialUpdateResponse);
        this.client.prepareResponse(this.listOffsetRequestMatcher(-1L), (AbstractResponse)this.listOffsetResponse(Errors.NONE, 1L, 5L));
        this.offsetFetcher.resetPositionsIfNeeded();
        this.consumerClient.pollNoWakeup();
        Assertions.assertFalse((boolean)this.subscriptions.isOffsetResetNeeded(this.tp0));
        Assertions.assertTrue((boolean)this.subscriptions.isFetchable(this.tp0));
        Assertions.assertEquals((long)5L, (long)this.subscriptions.position((TopicPartition)this.tp0).offset);
    }

    @Test
    public void testUpdateFetchPositionResetToDurationOffset() {
        long timestamp = Instant.now().toEpochMilli();
        AutoOffsetResetStrategy durationStrategy = (AutoOffsetResetStrategy)Mockito.mock(AutoOffsetResetStrategy.class);
        Mockito.when((Object)durationStrategy.timestamp()).thenReturn(Optional.of(timestamp));
        this.buildFetcher(durationStrategy);
        this.assignFromUser(Collections.singleton(this.tp0));
        this.subscriptions.requestOffsetReset(this.tp0, durationStrategy);
        this.client.updateMetadata(this.initialUpdateResponse);
        this.client.prepareResponse(this.listOffsetRequestMatcher(timestamp), (AbstractResponse)this.listOffsetResponse(Errors.NONE, 1L, 5L));
        this.offsetFetcher.resetPositionsIfNeeded();
        this.consumerClient.pollNoWakeup();
        Assertions.assertFalse((boolean)this.subscriptions.isOffsetResetNeeded(this.tp0));
        Assertions.assertTrue((boolean)this.subscriptions.isFetchable(this.tp0));
        Assertions.assertEquals((long)5L, (long)this.subscriptions.position((TopicPartition)this.tp0).offset);
    }

    @Test
    public void testFetchOffsetErrors() {
        this.buildFetcher();
        this.assignFromUser(Collections.singleton(this.tp0));
        this.subscriptions.requestOffsetReset(this.tp0, AutoOffsetResetStrategy.LATEST);
        this.client.prepareResponse(this.listOffsetRequestMatcher(-1L, 0), (AbstractResponse)this.listOffsetResponse(Errors.OFFSET_NOT_AVAILABLE, 1L, 5L), false);
        this.offsetFetcher.resetPositionsIfNeeded();
        this.consumerClient.pollNoWakeup();
        Assertions.assertFalse((boolean)this.subscriptions.hasValidPosition(this.tp0));
        Assertions.assertTrue((boolean)this.subscriptions.isOffsetResetNeeded(this.tp0));
        Assertions.assertFalse((boolean)this.subscriptions.isFetchable(this.tp0));
        this.time.sleep(100L);
        this.client.prepareResponse(this.listOffsetRequestMatcher(-1L, 0), (AbstractResponse)this.listOffsetResponse(Errors.LEADER_NOT_AVAILABLE, 1L, 5L), false);
        this.offsetFetcher.resetPositionsIfNeeded();
        this.consumerClient.pollNoWakeup();
        Assertions.assertFalse((boolean)this.subscriptions.hasValidPosition(this.tp0));
        Assertions.assertTrue((boolean)this.subscriptions.isOffsetResetNeeded(this.tp0));
        Assertions.assertFalse((boolean)this.subscriptions.isFetchable(this.tp0));
        this.time.sleep(100L);
        this.client.prepareResponse(this.listOffsetRequestMatcher(-1L), (AbstractResponse)this.listOffsetResponse(Errors.NONE, 1L, 5L), false);
        this.offsetFetcher.resetPositionsIfNeeded();
        this.consumerClient.pollNoWakeup();
        Assertions.assertTrue((boolean)this.subscriptions.hasValidPosition(this.tp0));
        Assertions.assertFalse((boolean)this.subscriptions.isOffsetResetNeeded(this.tp0));
        Assertions.assertTrue((boolean)this.subscriptions.isFetchable(this.tp0));
        Assertions.assertEquals((long)this.subscriptions.position((TopicPartition)this.tp0).offset, (long)5L);
    }

    @Test
    public void testListOffsetSendsReadUncommitted() {
        this.testListOffsetsSendsIsolationLevel(IsolationLevel.READ_UNCOMMITTED);
    }

    @Test
    public void testListOffsetSendsReadCommitted() {
        this.testListOffsetsSendsIsolationLevel(IsolationLevel.READ_COMMITTED);
    }

    private void testListOffsetsSendsIsolationLevel(IsolationLevel isolationLevel) {
        this.buildFetcher(isolationLevel);
        this.assignFromUser(Collections.singleton(this.tp0));
        this.subscriptions.requestOffsetReset(this.tp0, AutoOffsetResetStrategy.LATEST);
        this.client.prepareResponse(body -> {
            ListOffsetsRequest request = (ListOffsetsRequest)body;
            Assertions.assertEquals((int)30000, (int)request.timeoutMs());
            return request.isolationLevel() == isolationLevel;
        }, (AbstractResponse)this.listOffsetResponse(Errors.NONE, 1L, 5L));
        this.offsetFetcher.resetPositionsIfNeeded();
        this.consumerClient.pollNoWakeup();
        Assertions.assertFalse((boolean)this.subscriptions.isOffsetResetNeeded(this.tp0));
        Assertions.assertTrue((boolean)this.subscriptions.isFetchable(this.tp0));
        Assertions.assertEquals((long)5L, (long)this.subscriptions.position((TopicPartition)this.tp0).offset);
    }

    @Test
    public void testresetPositionsSkipsBlackedOutConnections() {
        this.buildFetcher();
        this.assignFromUser(Collections.singleton(this.tp0));
        this.subscriptions.requestOffsetReset(this.tp0, AutoOffsetResetStrategy.EARLIEST);
        this.client.updateMetadata(this.initialUpdateResponse);
        Node node = (Node)this.initialUpdateResponse.brokers().iterator().next();
        this.client.backoff(node, 500L);
        this.offsetFetcher.resetPositionsIfNeeded();
        Assertions.assertEquals((int)0, (int)this.consumerClient.pendingRequestCount());
        this.consumerClient.pollNoWakeup();
        Assertions.assertTrue((boolean)this.subscriptions.isOffsetResetNeeded(this.tp0));
        Assertions.assertEquals((Object)AutoOffsetResetStrategy.EARLIEST, (Object)this.subscriptions.resetStrategy(this.tp0));
        this.time.sleep(500L);
        this.client.prepareResponse(this.listOffsetRequestMatcher(-2L), (AbstractResponse)this.listOffsetResponse(Errors.NONE, 1L, 5L));
        this.offsetFetcher.resetPositionsIfNeeded();
        this.consumerClient.pollNoWakeup();
        Assertions.assertFalse((boolean)this.subscriptions.isOffsetResetNeeded(this.tp0));
        Assertions.assertTrue((boolean)this.subscriptions.isFetchable(this.tp0));
        Assertions.assertEquals((long)5L, (long)this.subscriptions.position((TopicPartition)this.tp0).offset);
    }

    @Test
    public void testUpdateFetchPositionResetToEarliestOffset() {
        this.buildFetcher();
        this.assignFromUser(Collections.singleton(this.tp0));
        this.subscriptions.requestOffsetReset(this.tp0, AutoOffsetResetStrategy.EARLIEST);
        this.client.prepareResponse(this.listOffsetRequestMatcher(-2L, 0), (AbstractResponse)this.listOffsetResponse(Errors.NONE, 1L, 5L));
        this.offsetFetcher.resetPositionsIfNeeded();
        this.consumerClient.pollNoWakeup();
        Assertions.assertFalse((boolean)this.subscriptions.isOffsetResetNeeded(this.tp0));
        Assertions.assertTrue((boolean)this.subscriptions.isFetchable(this.tp0));
        Assertions.assertEquals((long)5L, (long)this.subscriptions.position((TopicPartition)this.tp0).offset);
    }

    @Test
    public void testresetPositionsMetadataRefresh() {
        this.buildFetcher();
        this.assignFromUser(Collections.singleton(this.tp0));
        this.subscriptions.requestOffsetReset(this.tp0, AutoOffsetResetStrategy.LATEST);
        this.client.prepareResponse(this.listOffsetRequestMatcher(-1L, 0), (AbstractResponse)this.listOffsetResponse(Errors.NOT_LEADER_OR_FOLLOWER, 1L, 5L), false);
        this.offsetFetcher.resetPositionsIfNeeded();
        this.consumerClient.pollNoWakeup();
        Assertions.assertFalse((boolean)this.subscriptions.hasValidPosition(this.tp0));
        this.client.prepareMetadataUpdate(this.initialUpdateResponse);
        this.consumerClient.pollNoWakeup();
        Assertions.assertFalse((boolean)this.client.hasPendingMetadataUpdates());
        this.time.sleep(100L);
        this.client.prepareResponse(this.listOffsetRequestMatcher(-1L), (AbstractResponse)this.listOffsetResponse(Errors.NONE, 1L, 5L));
        this.offsetFetcher.resetPositionsIfNeeded();
        this.consumerClient.pollNoWakeup();
        Assertions.assertFalse((boolean)this.subscriptions.isOffsetResetNeeded(this.tp0));
        Assertions.assertTrue((boolean)this.subscriptions.isFetchable(this.tp0));
        Assertions.assertEquals((long)5L, (long)this.subscriptions.position((TopicPartition)this.tp0).offset);
    }

    @Test
    public void testListOffsetNoUpdateMissingEpoch() {
        this.buildFetcher();
        this.subscriptions.assignFromUser(Collections.singleton(this.tp0));
        MetadataResponse metadataWithNoLeaderEpochs = RequestTestUtils.metadataUpdateWithIds("kafka-cluster", 1, Collections.emptyMap(), Collections.singletonMap("test", 4), tp -> null, this.topicIds);
        this.client.updateMetadata(metadataWithNoLeaderEpochs);
        this.subscriptions.requestOffsetReset(this.tp0, AutoOffsetResetStrategy.LATEST);
        this.client.prepareResponse(this.listOffsetRequestMatcher(-1L), (AbstractResponse)this.listOffsetResponse(this.tp0, Errors.NONE, 1L, 5L, 1));
        this.offsetFetcher.resetPositionsIfNeeded();
        this.consumerClient.pollNoWakeup();
        Assertions.assertFalse((boolean)this.subscriptions.isOffsetResetNeeded(this.tp0));
        Assertions.assertFalse((boolean)this.metadata.updateRequested());
        Assertions.assertFalse((boolean)this.metadata.lastSeenLeaderEpoch(this.tp0).isPresent());
    }

    @Test
    public void testListOffsetUpdateEpoch() {
        this.buildFetcher();
        this.subscriptions.assignFromUser(Collections.singleton(this.tp0));
        MetadataResponse metadataWithLeaderEpochs = RequestTestUtils.metadataUpdateWithIds("kafka-cluster", 1, Collections.emptyMap(), Collections.singletonMap("test", 4), tp -> 1, this.topicIds);
        this.client.updateMetadata(metadataWithLeaderEpochs);
        this.subscriptions.requestOffsetReset(this.tp0, AutoOffsetResetStrategy.LATEST);
        this.client.prepareResponse(this.listOffsetRequestMatcher(-1L, 1), (AbstractResponse)this.listOffsetResponse(this.tp0, Errors.NONE, 1L, 5L, 2));
        this.offsetFetcher.resetPositionsIfNeeded();
        this.consumerClient.pollNoWakeup();
        Assertions.assertFalse((boolean)this.subscriptions.isOffsetResetNeeded(this.tp0));
        Assertions.assertTrue((boolean)this.metadata.updateRequested());
        TestUtils.assertOptional(this.metadata.lastSeenLeaderEpoch(this.tp0), epoch -> Assertions.assertEquals((long)epoch.intValue(), (long)2L));
    }

    @Test
    public void testUpdateFetchPositionDisconnect() {
        this.buildFetcher();
        this.assignFromUser(Collections.singleton(this.tp0));
        this.subscriptions.requestOffsetReset(this.tp0, AutoOffsetResetStrategy.LATEST);
        this.client.prepareResponse(this.listOffsetRequestMatcher(-1L, 0), (AbstractResponse)this.listOffsetResponse(Errors.NONE, 1L, 5L), true);
        this.offsetFetcher.resetPositionsIfNeeded();
        this.consumerClient.pollNoWakeup();
        Assertions.assertFalse((boolean)this.subscriptions.hasValidPosition(this.tp0));
        this.client.prepareMetadataUpdate(this.initialUpdateResponse);
        this.consumerClient.pollNoWakeup();
        Assertions.assertFalse((boolean)this.client.hasPendingMetadataUpdates());
        this.offsetFetcher.resetPositionsIfNeeded();
        this.consumerClient.pollNoWakeup();
        Assertions.assertFalse((boolean)this.client.hasInFlightRequests());
        Assertions.assertFalse((boolean)this.subscriptions.hasValidPosition(this.tp0));
        this.time.sleep(100L);
        this.client.prepareResponse(this.listOffsetRequestMatcher(-1L), (AbstractResponse)this.listOffsetResponse(Errors.NONE, 1L, 5L));
        this.offsetFetcher.resetPositionsIfNeeded();
        this.consumerClient.pollNoWakeup();
        Assertions.assertFalse((boolean)this.subscriptions.isOffsetResetNeeded(this.tp0));
        Assertions.assertTrue((boolean)this.subscriptions.isFetchable(this.tp0));
        Assertions.assertEquals((long)5L, (long)this.subscriptions.position((TopicPartition)this.tp0).offset);
    }

    @Test
    public void testAssignmentChangeWithInFlightReset() {
        this.buildFetcher();
        this.assignFromUser(Collections.singleton(this.tp0));
        this.subscriptions.requestOffsetReset(this.tp0, AutoOffsetResetStrategy.LATEST);
        this.offsetFetcher.resetPositionsIfNeeded();
        this.consumerClient.pollNoWakeup();
        Assertions.assertFalse((boolean)this.subscriptions.hasValidPosition(this.tp0));
        Assertions.assertTrue((boolean)this.client.hasInFlightRequests());
        this.assignFromUser(Collections.singleton(this.tp1));
        this.client.respond((AbstractResponse)this.listOffsetResponse(Errors.NONE, 1L, 5L));
        this.consumerClient.pollNoWakeup();
        Assertions.assertFalse((boolean)this.client.hasPendingResponses());
        Assertions.assertFalse((boolean)this.client.hasInFlightRequests());
        Assertions.assertFalse((boolean)this.subscriptions.isAssigned(this.tp0));
    }

    @Test
    public void testSeekWithInFlightReset() {
        this.buildFetcher();
        this.assignFromUser(Collections.singleton(this.tp0));
        this.subscriptions.requestOffsetReset(this.tp0, AutoOffsetResetStrategy.LATEST);
        this.offsetFetcher.resetPositionsIfNeeded();
        this.consumerClient.pollNoWakeup();
        Assertions.assertFalse((boolean)this.subscriptions.hasValidPosition(this.tp0));
        Assertions.assertTrue((boolean)this.client.hasInFlightRequests());
        this.subscriptions.seek(this.tp0, 237L);
        this.client.respond((AbstractResponse)this.listOffsetResponse(Errors.NONE, 1L, 5L));
        this.consumerClient.pollNoWakeup();
        Assertions.assertFalse((boolean)this.client.hasPendingResponses());
        Assertions.assertFalse((boolean)this.client.hasInFlightRequests());
        Assertions.assertEquals((long)237L, (long)this.subscriptions.position((TopicPartition)this.tp0).offset);
    }

    private boolean listOffsetMatchesExpectedReset(TopicPartition tp, AutoOffsetResetStrategy strategy, AbstractRequest request) {
        Assertions.assertInstanceOf(ListOffsetsRequest.class, (Object)request);
        ListOffsetsRequest req = (ListOffsetsRequest)request;
        Assertions.assertEquals(Collections.singleton(tp.topic()), req.data().topics().stream().map(ListOffsetsRequestData.ListOffsetsTopic::name).collect(Collectors.toSet()));
        ListOffsetsRequestData.ListOffsetsTopic listTopic = (ListOffsetsRequestData.ListOffsetsTopic)req.data().topics().get(0);
        Assertions.assertEquals(Collections.singleton(tp.partition()), listTopic.partitions().stream().map(ListOffsetsRequestData.ListOffsetsPartition::partitionIndex).collect(Collectors.toSet()));
        ListOffsetsRequestData.ListOffsetsPartition listPartition = (ListOffsetsRequestData.ListOffsetsPartition)listTopic.partitions().get(0);
        if (strategy == AutoOffsetResetStrategy.EARLIEST) {
            Assertions.assertEquals((long)-2L, (long)listPartition.timestamp());
        } else if (strategy == AutoOffsetResetStrategy.LATEST) {
            Assertions.assertEquals((long)-1L, (long)listPartition.timestamp());
        }
        return true;
    }

    @Test
    public void testEarlierOffsetResetArrivesLate() {
        this.buildFetcher();
        this.assignFromUser(Collections.singleton(this.tp0));
        this.subscriptions.requestOffsetReset(this.tp0, AutoOffsetResetStrategy.EARLIEST);
        this.offsetFetcher.resetPositionsIfNeeded();
        this.client.prepareResponse(req -> {
            if (this.listOffsetMatchesExpectedReset(this.tp0, AutoOffsetResetStrategy.EARLIEST, req)) {
                this.subscriptions.requestOffsetReset(this.tp0, AutoOffsetResetStrategy.LATEST);
                return true;
            }
            return false;
        }, (AbstractResponse)this.listOffsetResponse(Errors.NONE, 1L, 0L));
        this.consumerClient.pollNoWakeup();
        Assertions.assertTrue((boolean)this.subscriptions.isOffsetResetNeeded(this.tp0));
        Assertions.assertEquals((Object)AutoOffsetResetStrategy.LATEST, (Object)this.subscriptions.resetStrategy(this.tp0));
        this.offsetFetcher.resetPositionsIfNeeded();
        this.client.prepareResponse(req -> this.listOffsetMatchesExpectedReset(this.tp0, AutoOffsetResetStrategy.LATEST, req), (AbstractResponse)this.listOffsetResponse(Errors.NONE, 1L, 10L));
        this.consumerClient.pollNoWakeup();
        Assertions.assertFalse((boolean)this.subscriptions.isOffsetResetNeeded(this.tp0));
        Assertions.assertEquals((long)10L, (long)this.subscriptions.position((TopicPartition)this.tp0).offset);
    }

    @Test
    public void testChangeResetWithInFlightReset() {
        this.buildFetcher();
        this.assignFromUser(Collections.singleton(this.tp0));
        this.subscriptions.requestOffsetReset(this.tp0, AutoOffsetResetStrategy.LATEST);
        this.offsetFetcher.resetPositionsIfNeeded();
        this.consumerClient.pollNoWakeup();
        Assertions.assertFalse((boolean)this.subscriptions.hasValidPosition(this.tp0));
        Assertions.assertTrue((boolean)this.client.hasInFlightRequests());
        this.subscriptions.requestOffsetReset(this.tp0, AutoOffsetResetStrategy.EARLIEST);
        this.client.respond((AbstractResponse)this.listOffsetResponse(Errors.NONE, 1L, 5L));
        this.consumerClient.pollNoWakeup();
        Assertions.assertFalse((boolean)this.client.hasPendingResponses());
        Assertions.assertFalse((boolean)this.client.hasInFlightRequests());
        Assertions.assertTrue((boolean)this.subscriptions.isOffsetResetNeeded(this.tp0));
        Assertions.assertEquals((Object)AutoOffsetResetStrategy.EARLIEST, (Object)this.subscriptions.resetStrategy(this.tp0));
    }

    @Test
    public void testIdempotentResetWithInFlightReset() {
        this.buildFetcher();
        this.assignFromUser(Collections.singleton(this.tp0));
        this.subscriptions.requestOffsetReset(this.tp0, AutoOffsetResetStrategy.LATEST);
        this.offsetFetcher.resetPositionsIfNeeded();
        this.consumerClient.pollNoWakeup();
        Assertions.assertFalse((boolean)this.subscriptions.hasValidPosition(this.tp0));
        Assertions.assertTrue((boolean)this.client.hasInFlightRequests());
        this.subscriptions.requestOffsetReset(this.tp0, AutoOffsetResetStrategy.LATEST);
        this.client.respond((AbstractResponse)this.listOffsetResponse(Errors.NONE, 1L, 5L));
        this.consumerClient.pollNoWakeup();
        Assertions.assertFalse((boolean)this.client.hasInFlightRequests());
        Assertions.assertFalse((boolean)this.subscriptions.isOffsetResetNeeded(this.tp0));
        Assertions.assertEquals((long)5L, (long)this.subscriptions.position((TopicPartition)this.tp0).offset);
    }

    @Test
    public void testResetOffsetsAuthorizationFailure() {
        this.buildFetcher();
        this.assignFromUser(Collections.singleton(this.tp0));
        this.subscriptions.requestOffsetReset(this.tp0, AutoOffsetResetStrategy.LATEST);
        this.client.prepareResponse(this.listOffsetRequestMatcher(-1L, 0), (AbstractResponse)this.listOffsetResponse(Errors.TOPIC_AUTHORIZATION_FAILED, -1L, -1L), false);
        this.offsetFetcher.resetPositionsIfNeeded();
        this.consumerClient.pollNoWakeup();
        Assertions.assertFalse((boolean)this.subscriptions.hasValidPosition(this.tp0));
        try {
            this.offsetFetcher.resetPositionsIfNeeded();
            Assertions.fail((String)"Expected authorization error to be raised");
        }
        catch (TopicAuthorizationException e) {
            Assertions.assertEquals(Collections.singleton(this.tp0.topic()), (Object)e.unauthorizedTopics());
        }
        this.offsetFetcher.resetPositionsIfNeeded();
        this.consumerClient.pollNoWakeup();
        Assertions.assertFalse((boolean)this.client.hasInFlightRequests());
        Assertions.assertFalse((boolean)this.subscriptions.hasValidPosition(this.tp0));
        this.time.sleep(100L);
        this.client.prepareResponse(this.listOffsetRequestMatcher(-1L), (AbstractResponse)this.listOffsetResponse(Errors.NONE, 1L, 5L));
        this.offsetFetcher.resetPositionsIfNeeded();
        this.consumerClient.pollNoWakeup();
        Assertions.assertFalse((boolean)this.subscriptions.isOffsetResetNeeded(this.tp0));
        Assertions.assertTrue((boolean)this.subscriptions.isFetchable(this.tp0));
        Assertions.assertEquals((long)5L, (long)this.subscriptions.position((TopicPartition)this.tp0).offset);
    }

    @Test
    public void testFetchingPendingPartitionsBeforeAndAfterSubscriptionReset() {
        this.buildFetcher();
        this.assignFromUser(Collections.singleton(this.tp0));
        this.subscriptions.seek(this.tp0, 100L);
        Assertions.assertEquals((long)100L, (long)this.subscriptions.position((TopicPartition)this.tp0).offset);
        Assertions.assertTrue((boolean)this.subscriptions.isFetchable(this.tp0));
        this.subscriptions.markPendingRevocation(Collections.singleton(this.tp0));
        this.offsetFetcher.resetPositionsIfNeeded();
        Assertions.assertFalse((boolean)this.subscriptions.isOffsetResetNeeded(this.tp0));
        Assertions.assertFalse((boolean)this.subscriptions.isFetchable(this.tp0));
        Assertions.assertTrue((boolean)this.subscriptions.hasValidPosition(this.tp0));
        Assertions.assertEquals((long)100L, (long)this.subscriptions.position((TopicPartition)this.tp0).offset);
        this.subscriptions.seek(this.tp0, 100L);
        Assertions.assertEquals((long)100L, (long)this.subscriptions.position((TopicPartition)this.tp0).offset);
        this.subscriptions.unsubscribe();
        this.assignFromUser(Collections.singleton(this.tp0));
        this.subscriptions.seek(this.tp0, 100L);
        Assertions.assertEquals((long)100L, (long)this.subscriptions.position((TopicPartition)this.tp0).offset);
        Assertions.assertTrue((boolean)this.subscriptions.isFetchable(this.tp0));
    }

    @Test
    public void testUpdateFetchPositionOfPausedPartitionsRequiringOffsetReset() {
        this.buildFetcher();
        this.assignFromUser(Collections.singleton(this.tp0));
        this.subscriptions.pause(this.tp0);
        this.subscriptions.requestOffsetReset(this.tp0, AutoOffsetResetStrategy.LATEST);
        this.client.prepareResponse(this.listOffsetRequestMatcher(-1L, 0), (AbstractResponse)this.listOffsetResponse(Errors.NONE, 1L, 10L));
        this.offsetFetcher.resetPositionsIfNeeded();
        this.consumerClient.pollNoWakeup();
        Assertions.assertFalse((boolean)this.subscriptions.isOffsetResetNeeded(this.tp0));
        Assertions.assertFalse((boolean)this.subscriptions.isFetchable(this.tp0));
        Assertions.assertTrue((boolean)this.subscriptions.hasValidPosition(this.tp0));
        Assertions.assertEquals((long)10L, (long)this.subscriptions.position((TopicPartition)this.tp0).offset);
    }

    @Test
    public void testUpdateFetchPositionOfPausedPartitionsWithoutAValidPosition() {
        this.buildFetcher();
        this.assignFromUser(Collections.singleton(this.tp0));
        this.subscriptions.requestOffsetReset(this.tp0);
        this.subscriptions.pause(this.tp0);
        this.offsetFetcher.resetPositionsIfNeeded();
        this.consumerClient.pollNoWakeup();
        Assertions.assertTrue((boolean)this.subscriptions.isOffsetResetNeeded(this.tp0));
        Assertions.assertFalse((boolean)this.subscriptions.isFetchable(this.tp0));
        Assertions.assertFalse((boolean)this.subscriptions.hasValidPosition(this.tp0));
    }

    @Test
    public void testUpdateFetchPositionOfPausedPartitionsWithAValidPosition() {
        this.buildFetcher();
        this.assignFromUser(Collections.singleton(this.tp0));
        this.subscriptions.seek(this.tp0, 10L);
        this.subscriptions.pause(this.tp0);
        this.offsetFetcher.resetPositionsIfNeeded();
        Assertions.assertFalse((boolean)this.subscriptions.isOffsetResetNeeded(this.tp0));
        Assertions.assertFalse((boolean)this.subscriptions.isFetchable(this.tp0));
        Assertions.assertTrue((boolean)this.subscriptions.hasValidPosition(this.tp0));
        Assertions.assertEquals((long)10L, (long)this.subscriptions.position((TopicPartition)this.tp0).offset);
    }

    @Test
    public void testGetOffsetsForTimesTimeout() {
        this.buildFetcher();
        Assertions.assertThrows(TimeoutException.class, () -> this.offsetFetcher.offsetsForTimes(Collections.singletonMap(new TopicPartition("test", 2), 1000L), this.time.timer(100L)));
    }

    @Test
    public void testGetOffsetsForTimes() {
        this.buildFetcher();
        Assertions.assertTrue((boolean)this.offsetFetcher.offsetsForTimes(new HashMap(), this.time.timer(100L)).isEmpty());
        this.testGetOffsetsForTimesWithUnknownOffset();
        this.testGetOffsetsForTimesWithError(Errors.NONE, Errors.NONE, -1L, null);
        this.testGetOffsetsForTimesWithError(Errors.NONE, Errors.NONE, 10L, 10L);
        this.testGetOffsetsForTimesWithError(Errors.NOT_LEADER_OR_FOLLOWER, Errors.INVALID_REQUEST, 10L, 10L);
        this.testGetOffsetsForTimesWithError(Errors.NONE, Errors.NOT_LEADER_OR_FOLLOWER, 10L, 10L);
        this.testGetOffsetsForTimesWithError(Errors.NOT_LEADER_OR_FOLLOWER, Errors.NONE, 10L, 10L);
        this.testGetOffsetsForTimesWithError(Errors.UNKNOWN_TOPIC_OR_PARTITION, Errors.NONE, 10L, 10L);
        this.testGetOffsetsForTimesWithError(Errors.UNSUPPORTED_FOR_MESSAGE_FORMAT, Errors.NONE, 10L, null);
        this.testGetOffsetsForTimesWithError(Errors.BROKER_NOT_AVAILABLE, Errors.NONE, 10L, 10L);
    }

    @Test
    public void testGetOffsetsFencedLeaderEpoch() {
        this.buildFetcher();
        this.subscriptions.assignFromUser(Collections.singleton(this.tp0));
        this.client.updateMetadata(this.initialUpdateResponse);
        this.subscriptions.requestOffsetReset(this.tp0, AutoOffsetResetStrategy.LATEST);
        this.client.prepareResponse((AbstractResponse)this.listOffsetResponse(Errors.FENCED_LEADER_EPOCH, 1L, 5L));
        this.offsetFetcher.resetPositionsIfNeeded();
        this.consumerClient.pollNoWakeup();
        Assertions.assertTrue((boolean)this.subscriptions.isOffsetResetNeeded(this.tp0));
        Assertions.assertFalse((boolean)this.subscriptions.isFetchable(this.tp0));
        Assertions.assertFalse((boolean)this.subscriptions.hasValidPosition(this.tp0));
        Assertions.assertEquals((long)0L, (long)this.metadata.timeToNextUpdate(this.time.milliseconds()));
    }

    @Test
    public void testGetOffsetByTimeWithPartitionsRetryCouldTriggerMetadataUpdate() {
        List<Errors> retriableErrors = Arrays.asList(Errors.NOT_LEADER_OR_FOLLOWER, Errors.REPLICA_NOT_AVAILABLE, Errors.KAFKA_STORAGE_ERROR, Errors.OFFSET_NOT_AVAILABLE, Errors.LEADER_NOT_AVAILABLE, Errors.FENCED_LEADER_EPOCH, Errors.UNKNOWN_LEADER_EPOCH);
        int newLeaderEpoch = 3;
        MetadataResponse updatedMetadata = RequestTestUtils.metadataUpdateWithIds("dummy", 3, Collections.singletonMap("test", Errors.NONE), Collections.singletonMap("test", 4), tp -> 3, this.topicIds);
        Node originalLeader = this.initialUpdateResponse.buildCluster().leaderFor(this.tp1);
        Node newLeader = updatedMetadata.buildCluster().leaderFor(this.tp1);
        Assertions.assertNotEquals((Object)originalLeader, (Object)newLeader);
        for (Errors retriableError : retriableErrors) {
            this.buildFetcher();
            this.subscriptions.assignFromUser(Set.of(this.tp0, this.tp1));
            this.client.updateMetadata(this.initialUpdateResponse);
            long fetchTimestamp = 10L;
            ListOffsetsResponseData.ListOffsetsPartitionResponse tp0NoError = new ListOffsetsResponseData.ListOffsetsPartitionResponse().setPartitionIndex(this.tp0.partition()).setErrorCode(Errors.NONE.code()).setTimestamp(10L).setOffset(4L);
            List<ListOffsetsResponseData.ListOffsetsTopicResponse> topics = Collections.singletonList(new ListOffsetsResponseData.ListOffsetsTopicResponse().setName(this.tp0.topic()).setPartitions(Arrays.asList(tp0NoError, new ListOffsetsResponseData.ListOffsetsPartitionResponse().setPartitionIndex(this.tp1.partition()).setErrorCode(retriableError.code()).setTimestamp(-1L).setOffset(-1L))));
            ListOffsetsResponseData data = new ListOffsetsResponseData().setThrottleTimeMs(0).setTopics(topics);
            this.client.prepareResponseFrom(body -> {
                boolean isListOffsetRequest = body instanceof ListOffsetsRequest;
                if (isListOffsetRequest) {
                    ListOffsetsRequest request = (ListOffsetsRequest)body;
                    List<ListOffsetsRequestData.ListOffsetsTopic> expectedTopics = Collections.singletonList(new ListOffsetsRequestData.ListOffsetsTopic().setName(this.tp0.topic()).setPartitions(Arrays.asList(new ListOffsetsRequestData.ListOffsetsPartition().setPartitionIndex(this.tp1.partition()).setTimestamp(10L).setCurrentLeaderEpoch(-1), new ListOffsetsRequestData.ListOffsetsPartition().setPartitionIndex(this.tp0.partition()).setTimestamp(10L).setCurrentLeaderEpoch(-1))));
                    return request.topics().equals(expectedTopics);
                }
                return false;
            }, (AbstractResponse)new ListOffsetsResponse(data), originalLeader);
            this.client.prepareMetadataUpdate(updatedMetadata);
            List<ListOffsetsResponseData.ListOffsetsTopicResponse> topicsWithFatalError = Collections.singletonList(new ListOffsetsResponseData.ListOffsetsTopicResponse().setName(this.tp0.topic()).setPartitions(Arrays.asList(tp0NoError, new ListOffsetsResponseData.ListOffsetsPartitionResponse().setPartitionIndex(this.tp1.partition()).setErrorCode(Errors.NOT_LEADER_OR_FOLLOWER.code()).setTimestamp(-1L).setOffset(-1L))));
            ListOffsetsResponseData dataWithFatalError = new ListOffsetsResponseData().setThrottleTimeMs(0).setTopics(topicsWithFatalError);
            this.client.prepareResponseFrom((AbstractResponse)new ListOffsetsResponse(dataWithFatalError), originalLeader);
            this.client.prepareResponseFrom(body -> {
                boolean isListOffsetRequest = body instanceof ListOffsetsRequest;
                if (isListOffsetRequest) {
                    ListOffsetsRequest request = (ListOffsetsRequest)body;
                    ListOffsetsRequestData.ListOffsetsTopic requestTopic = (ListOffsetsRequestData.ListOffsetsTopic)request.topics().get(0);
                    ListOffsetsRequestData.ListOffsetsPartition expectedPartition = new ListOffsetsRequestData.ListOffsetsPartition().setPartitionIndex(this.tp1.partition()).setTimestamp(10L).setCurrentLeaderEpoch(3);
                    return expectedPartition.equals(requestTopic.partitions().get(0));
                }
                return false;
            }, (AbstractResponse)this.listOffsetResponse(this.tp1, Errors.NONE, 10L, 5L), newLeader);
            Map offsetAndTimestampMap = this.offsetFetcher.offsetsForTimes(Utils.mkMap((Map.Entry[])new Map.Entry[]{Utils.mkEntry((Object)this.tp0, (Object)10L), Utils.mkEntry((Object)this.tp1, (Object)10L)}), this.time.timer(Integer.MAX_VALUE));
            Assertions.assertEquals((Object)Utils.mkMap((Map.Entry[])new Map.Entry[]{Utils.mkEntry((Object)this.tp0, (Object)new OffsetAndTimestamp(4L, 10L)), Utils.mkEntry((Object)this.tp1, (Object)new OffsetAndTimestamp(5L, 10L))}), (Object)offsetAndTimestampMap);
            Assertions.assertEquals((int)1, (int)this.client.numAwaitingResponses());
        }
    }

    @Test
    public void testGetOffsetsUnknownLeaderEpoch() {
        this.buildFetcher();
        this.subscriptions.assignFromUser(Collections.singleton(this.tp0));
        this.subscriptions.requestOffsetReset(this.tp0, AutoOffsetResetStrategy.LATEST);
        this.client.prepareResponse((AbstractResponse)this.listOffsetResponse(Errors.UNKNOWN_LEADER_EPOCH, 1L, 5L));
        this.offsetFetcher.resetPositionsIfNeeded();
        this.consumerClient.pollNoWakeup();
        Assertions.assertTrue((boolean)this.subscriptions.isOffsetResetNeeded(this.tp0));
        Assertions.assertFalse((boolean)this.subscriptions.isFetchable(this.tp0));
        Assertions.assertFalse((boolean)this.subscriptions.hasValidPosition(this.tp0));
        Assertions.assertEquals((long)0L, (long)this.metadata.timeToNextUpdate(this.time.milliseconds()));
    }

    @Test
    public void testGetOffsetsIncludesLeaderEpoch() {
        this.buildFetcher();
        this.subscriptions.assignFromUser(Collections.singleton(this.tp0));
        this.client.updateMetadata(this.initialUpdateResponse);
        MetadataResponse metadataResponse = RequestTestUtils.metadataUpdateWithIds("dummy", 1, Collections.emptyMap(), Collections.singletonMap("test", 4), tp -> 99, this.topicIds);
        this.client.updateMetadata(metadataResponse);
        this.subscriptions.requestOffsetReset(this.tp0);
        this.offsetFetcher.resetPositionsIfNeeded();
        MockClient.RequestMatcher matcher = body -> {
            if (body instanceof ListOffsetsRequest) {
                ListOffsetsRequest offsetRequest = (ListOffsetsRequest)body;
                int epoch = ((ListOffsetsRequestData.ListOffsetsPartition)((ListOffsetsRequestData.ListOffsetsTopic)offsetRequest.topics().get(0)).partitions().get(0)).currentLeaderEpoch();
                Assertions.assertTrue((epoch != -1 ? 1 : 0) != 0, (String)"Expected Fetcher to set leader epoch in request");
                Assertions.assertEquals((int)epoch, (int)99, (String)"Expected leader epoch to match epoch from metadata update");
                return true;
            }
            Assertions.fail((String)"Should have seen ListOffsetRequest");
            return false;
        };
        this.client.prepareResponse(matcher, (AbstractResponse)this.listOffsetResponse(Errors.NONE, 1L, 5L));
        this.consumerClient.pollNoWakeup();
    }

    @Test
    public void testGetOffsetsForTimesWhenSomeTopicPartitionLeadersNotKnownInitially() {
        this.buildFetcher();
        this.subscriptions.assignFromUser(Set.of(this.tp0, this.tp1));
        String anotherTopic = "another-topic";
        TopicPartition t2p0 = new TopicPartition("another-topic", 0);
        this.client.reset();
        MetadataResponse initialMetadata = RequestTestUtils.metadataUpdateWithIds(3, Collections.singletonMap("test", 2), this.topicIds);
        this.client.updateMetadata(initialMetadata);
        this.client.prepareMetadataUpdate(initialMetadata);
        this.client.prepareResponseFrom((AbstractResponse)this.listOffsetResponse(this.tp0, Errors.NONE, 1000L, 11L), this.metadata.fetch().leaderFor(this.tp0));
        this.client.prepareResponseFrom((AbstractResponse)this.listOffsetResponse(this.tp1, Errors.NONE, 1000L, 32L), this.metadata.fetch().leaderFor(this.tp1));
        HashMap<String, Integer> partitionNumByTopic = new HashMap<String, Integer>();
        partitionNumByTopic.put("test", 2);
        partitionNumByTopic.put("another-topic", 1);
        this.topicIds.put("another-topic", Uuid.randomUuid());
        MetadataResponse updatedMetadata = RequestTestUtils.metadataUpdateWithIds(3, partitionNumByTopic, this.topicIds);
        this.client.prepareMetadataUpdate(updatedMetadata);
        this.client.prepareResponseFrom((AbstractResponse)this.listOffsetResponse(t2p0, Errors.NONE, 1000L, 54L), this.metadata.fetch().leaderFor(t2p0));
        HashMap<TopicPartition, Long> timestampToSearch = new HashMap<TopicPartition, Long>();
        timestampToSearch.put(this.tp0, -1L);
        timestampToSearch.put(this.tp1, -1L);
        timestampToSearch.put(t2p0, -1L);
        Map offsetAndTimestampMap = this.offsetFetcher.offsetsForTimes(timestampToSearch, this.time.timer(Long.MAX_VALUE));
        Assertions.assertNotNull(offsetAndTimestampMap.get(this.tp0), (String)("Expect MetadataFetcher.offsetsForTimes() to return non-null result for " + String.valueOf(this.tp0)));
        Assertions.assertNotNull(offsetAndTimestampMap.get(this.tp1), (String)("Expect MetadataFetcher.offsetsForTimes() to return non-null result for " + String.valueOf(this.tp1)));
        Assertions.assertNotNull(offsetAndTimestampMap.get(t2p0), (String)("Expect MetadataFetcher.offsetsForTimes() to return non-null result for " + String.valueOf(t2p0)));
        Assertions.assertEquals((long)11L, (long)((OffsetAndTimestamp)offsetAndTimestampMap.get(this.tp0)).offset());
        Assertions.assertEquals((long)32L, (long)((OffsetAndTimestamp)offsetAndTimestampMap.get(this.tp1)).offset());
        Assertions.assertEquals((long)54L, (long)((OffsetAndTimestamp)offsetAndTimestampMap.get(t2p0)).offset());
    }

    @Test
    public void testGetOffsetsForTimesWhenSomeTopicPartitionLeadersDisconnectException() {
        this.buildFetcher();
        String anotherTopic = "another-topic";
        TopicPartition t2p0 = new TopicPartition("another-topic", 0);
        this.subscriptions.assignFromUser(Set.of(this.tp0, t2p0));
        this.client.reset();
        MetadataResponse initialMetadata = RequestTestUtils.metadataUpdateWithIds(1, Collections.singletonMap("test", 1), this.topicIds);
        this.client.updateMetadata(initialMetadata);
        HashMap<String, Integer> partitionNumByTopic = new HashMap<String, Integer>();
        partitionNumByTopic.put("test", 1);
        partitionNumByTopic.put("another-topic", 1);
        this.topicIds.put("another-topic", Uuid.randomUuid());
        MetadataResponse updatedMetadata = RequestTestUtils.metadataUpdateWithIds(1, partitionNumByTopic, this.topicIds);
        this.client.prepareMetadataUpdate(updatedMetadata);
        this.client.prepareResponse(this.listOffsetRequestMatcher(-1L), (AbstractResponse)this.listOffsetResponse(this.tp0, Errors.NONE, 1000L, 11L), true);
        this.client.prepareResponseFrom((AbstractResponse)this.listOffsetResponse(this.tp0, Errors.NONE, 1000L, 11L), this.metadata.fetch().leaderFor(this.tp0));
        HashMap<TopicPartition, Long> timestampToSearch = new HashMap<TopicPartition, Long>();
        timestampToSearch.put(this.tp0, -1L);
        Map offsetAndTimestampMap = this.offsetFetcher.offsetsForTimes(timestampToSearch, this.time.timer(Long.MAX_VALUE));
        Assertions.assertNotNull(offsetAndTimestampMap.get(this.tp0), (String)("Expect MetadataFetcher.offsetsForTimes() to return non-null result for " + String.valueOf(this.tp0)));
        Assertions.assertEquals((long)11L, (long)((OffsetAndTimestamp)offsetAndTimestampMap.get(this.tp0)).offset());
        Assertions.assertNotNull((Object)this.metadata.fetch().partitionCountForTopic("another-topic"));
    }

    @Test
    public void testListOffsetsWithZeroTimeout() {
        this.buildFetcher();
        HashMap<TopicPartition, Long> offsetsToSearch = new HashMap<TopicPartition, Long>();
        offsetsToSearch.put(this.tp0, -2L);
        offsetsToSearch.put(this.tp1, -2L);
        HashMap<TopicPartition, Object> offsetsToExpect = new HashMap<TopicPartition, Object>();
        offsetsToExpect.put(this.tp0, null);
        offsetsToExpect.put(this.tp1, null);
        Assertions.assertEquals(offsetsToExpect, (Object)this.offsetFetcher.offsetsForTimes(offsetsToSearch, this.time.timer(0L)));
    }

    @Test
    public void testBatchedListOffsetsMetadataErrors() {
        this.buildFetcher();
        ListOffsetsResponseData data = new ListOffsetsResponseData().setThrottleTimeMs(0).setTopics(Collections.singletonList(new ListOffsetsResponseData.ListOffsetsTopicResponse().setName(this.tp0.topic()).setPartitions(Arrays.asList(new ListOffsetsResponseData.ListOffsetsPartitionResponse().setPartitionIndex(this.tp0.partition()).setErrorCode(Errors.NOT_LEADER_OR_FOLLOWER.code()).setTimestamp(-1L).setOffset(-1L), new ListOffsetsResponseData.ListOffsetsPartitionResponse().setPartitionIndex(this.tp1.partition()).setErrorCode(Errors.UNKNOWN_TOPIC_OR_PARTITION.code()).setTimestamp(-1L).setOffset(-1L)))));
        this.client.prepareResponse((AbstractResponse)new ListOffsetsResponse(data));
        HashMap<TopicPartition, Long> offsetsToSearch = new HashMap<TopicPartition, Long>();
        offsetsToSearch.put(this.tp0, -2L);
        offsetsToSearch.put(this.tp1, -2L);
        Assertions.assertThrows(TimeoutException.class, () -> this.offsetFetcher.offsetsForTimes(offsetsToSearch, this.time.timer(1L)));
    }

    private void testGetOffsetsForTimesWithError(Errors errorForP0, Errors errorForP1, long offsetForP0, Long expectedOffsetForP0) {
        long offsetForP1 = 100L;
        long expectedOffsetForP1 = 100L;
        this.client.reset();
        String topicName2 = "topic2";
        TopicPartition t2p0 = new TopicPartition(topicName2, 0);
        this.metadata.bootstrap(ClientUtils.parseAndValidateAddresses(Collections.singletonList("1.1.1.1:1111"), (ClientDnsLookup)ClientDnsLookup.USE_ALL_DNS_IPS));
        HashMap<String, Integer> partitionNumByTopic = new HashMap<String, Integer>();
        partitionNumByTopic.put("test", 2);
        partitionNumByTopic.put(topicName2, 1);
        MetadataResponse updateMetadataResponse = RequestTestUtils.metadataUpdateWithIds(2, partitionNumByTopic, this.topicIds);
        Cluster updatedCluster = updateMetadataResponse.buildCluster();
        this.client.prepareMetadataUpdate(updateMetadataResponse, true);
        this.client.prepareResponseFrom((AbstractResponse)this.listOffsetResponse(t2p0, errorForP0, offsetForP0, offsetForP0), updatedCluster.leaderFor(t2p0));
        this.client.prepareResponseFrom((AbstractResponse)this.listOffsetResponse(this.tp1, errorForP1, offsetForP1, offsetForP1), updatedCluster.leaderFor(this.tp1));
        this.client.prepareResponseFrom((AbstractResponse)this.listOffsetResponse(t2p0, Errors.NONE, offsetForP0, offsetForP0), updatedCluster.leaderFor(t2p0));
        this.client.prepareResponseFrom((AbstractResponse)this.listOffsetResponse(this.tp1, Errors.NONE, offsetForP1, offsetForP1), updatedCluster.leaderFor(this.tp1));
        HashMap<TopicPartition, Long> timestampToSearch = new HashMap<TopicPartition, Long>();
        timestampToSearch.put(t2p0, 0L);
        timestampToSearch.put(this.tp1, 0L);
        Map offsetAndTimestampMap = this.offsetFetcher.offsetsForTimes(timestampToSearch, this.time.timer(Long.MAX_VALUE));
        if (expectedOffsetForP0 == null) {
            Assertions.assertNull(offsetAndTimestampMap.get(t2p0));
        } else {
            Assertions.assertEquals((long)expectedOffsetForP0, (long)((OffsetAndTimestamp)offsetAndTimestampMap.get(t2p0)).timestamp());
            Assertions.assertEquals((long)expectedOffsetForP0, (long)((OffsetAndTimestamp)offsetAndTimestampMap.get(t2p0)).offset());
        }
        Assertions.assertEquals((long)expectedOffsetForP1, (long)((OffsetAndTimestamp)offsetAndTimestampMap.get(this.tp1)).timestamp());
        Assertions.assertEquals((long)expectedOffsetForP1, (long)((OffsetAndTimestamp)offsetAndTimestampMap.get(this.tp1)).offset());
    }

    private void testGetOffsetsForTimesWithUnknownOffset() {
        this.client.reset();
        MetadataResponse initialMetadataUpdate = RequestTestUtils.metadataUpdateWithIds(1, Collections.singletonMap("test", 1), this.topicIds);
        this.client.updateMetadata(initialMetadataUpdate);
        ListOffsetsResponseData data = new ListOffsetsResponseData().setThrottleTimeMs(0).setTopics(Collections.singletonList(new ListOffsetsResponseData.ListOffsetsTopicResponse().setName(this.tp0.topic()).setPartitions(Collections.singletonList(new ListOffsetsResponseData.ListOffsetsPartitionResponse().setPartitionIndex(this.tp0.partition()).setErrorCode(Errors.NONE.code()).setTimestamp(-1L).setOffset(-1L)))));
        this.client.prepareResponseFrom((AbstractResponse)new ListOffsetsResponse(data), this.metadata.fetch().leaderFor(this.tp0));
        HashMap<TopicPartition, Long> timestampToSearch = new HashMap<TopicPartition, Long>();
        timestampToSearch.put(this.tp0, 0L);
        Map offsetAndTimestampMap = this.offsetFetcher.offsetsForTimes(timestampToSearch, this.time.timer(Long.MAX_VALUE));
        Assertions.assertTrue((boolean)offsetAndTimestampMap.containsKey(this.tp0));
        Assertions.assertNull(offsetAndTimestampMap.get(this.tp0));
    }

    @Test
    public void testOffsetValidationRequestGrouping() {
        this.buildFetcher();
        this.assignFromUser(Set.of(this.tp0, this.tp1, this.tp2, this.tp3));
        this.metadata.updateWithCurrentRequestVersion(RequestTestUtils.metadataUpdateWithIds("dummy", 3, Collections.emptyMap(), Collections.singletonMap("test", 4), tp -> 5, this.topicIds), false, 0L);
        for (TopicPartition tp2 : this.subscriptions.assignedPartitions()) {
            Metadata.LeaderAndEpoch leaderAndEpoch = new Metadata.LeaderAndEpoch(this.metadata.currentLeader((TopicPartition)tp2).leader, Optional.of(4));
            this.subscriptions.seekUnvalidated(tp2, new SubscriptionState.FetchPosition(0L, Optional.of(4), leaderAndEpoch));
        }
        HashSet<TopicPartition> allRequestedPartitions = new HashSet<TopicPartition>();
        for (Node node : this.metadata.fetch().nodes()) {
            this.apiVersions.update(node.idString(), NodeApiVersions.create());
            Set<TopicPartition> expectedPartitions = this.subscriptions.assignedPartitions().stream().filter(tp -> this.metadata.currentLeader((TopicPartition)tp).leader.equals(Optional.of(node))).collect(Collectors.toSet());
            Assertions.assertTrue((boolean)expectedPartitions.stream().noneMatch(allRequestedPartitions::contains));
            Assertions.assertFalse((boolean)expectedPartitions.isEmpty());
            allRequestedPartitions.addAll(expectedPartitions);
            OffsetForLeaderEpochResponseData data = new OffsetForLeaderEpochResponseData();
            expectedPartitions.forEach(tp -> {
                OffsetForLeaderEpochResponseData.OffsetForLeaderTopicResult topic = data.topics().find(tp.topic());
                if (topic == null) {
                    topic = new OffsetForLeaderEpochResponseData.OffsetForLeaderTopicResult().setTopic(tp.topic());
                    data.topics().add((ImplicitLinkedHashCollection.Element)topic);
                }
                topic.partitions().add(new OffsetForLeaderEpochResponseData.EpochEndOffset().setPartition(tp.partition()).setErrorCode(Errors.NONE.code()).setLeaderEpoch(4).setEndOffset(0L));
            });
            OffsetsForLeaderEpochResponse response = new OffsetsForLeaderEpochResponse(data);
            this.client.prepareResponseFrom(body -> {
                OffsetsForLeaderEpochRequest request = (OffsetsForLeaderEpochRequest)body;
                return expectedPartitions.equals(this.offsetForLeaderPartitionMap(request.data()).keySet());
            }, (AbstractResponse)response, node);
        }
        Assertions.assertEquals((Object)this.subscriptions.assignedPartitions(), allRequestedPartitions);
        this.offsetFetcher.validatePositionsIfNeeded();
        this.consumerClient.pollNoWakeup();
        Assertions.assertTrue((boolean)this.subscriptions.assignedPartitions().stream().noneMatch(arg_0 -> ((SubscriptionState)this.subscriptions).awaitingValidation(arg_0)));
    }

    @Test
    public void testOffsetValidationAwaitsNodeApiVersion() {
        this.buildFetcher();
        this.assignFromUser(Collections.singleton(this.tp0));
        HashMap<String, Integer> partitionCounts = new HashMap<String, Integer>();
        partitionCounts.put(this.tp0.topic(), 4);
        boolean epochOne = true;
        this.metadata.updateWithCurrentRequestVersion(RequestTestUtils.metadataUpdateWithIds("dummy", 1, Collections.emptyMap(), partitionCounts, tp -> 1, this.topicIds), false, 0L);
        Node node = (Node)this.metadata.fetch().nodes().get(0);
        Assertions.assertFalse((boolean)this.client.isConnected(node.idString()));
        Metadata.LeaderAndEpoch leaderAndEpoch = new Metadata.LeaderAndEpoch(this.metadata.currentLeader((TopicPartition)this.tp0).leader, Optional.of(1));
        this.subscriptions.seekUnvalidated(this.tp0, new SubscriptionState.FetchPosition(20L, Optional.of(1), leaderAndEpoch));
        Assertions.assertFalse((boolean)this.client.isConnected(node.idString()));
        Assertions.assertTrue((boolean)this.subscriptions.awaitingValidation(this.tp0));
        this.offsetFetcher.validatePositionsIfNeeded();
        Assertions.assertTrue((boolean)this.subscriptions.awaitingValidation(this.tp0));
        Assertions.assertTrue((boolean)this.client.isConnected(node.idString()));
        this.apiVersions.update(node.idString(), NodeApiVersions.create());
        this.client.prepareResponseFrom((AbstractResponse)this.prepareOffsetsForLeaderEpochResponse(this.tp0, 1, 30L), node);
        this.offsetFetcher.validatePositionsIfNeeded();
        this.consumerClient.pollNoWakeup();
        Assertions.assertFalse((boolean)this.subscriptions.awaitingValidation(this.tp0));
        Assertions.assertEquals((long)20L, (long)this.subscriptions.position((TopicPartition)this.tp0).offset);
    }

    @Test
    public void testOffsetValidationSkippedForOldBroker() {
        IsolationLevel isolationLevel = IsolationLevel.READ_UNCOMMITTED;
        int maxPollRecords = Integer.MAX_VALUE;
        long metadataExpireMs = Long.MAX_VALUE;
        AutoOffsetResetStrategy offsetResetStrategy = AutoOffsetResetStrategy.EARLIEST;
        int minBytes = 1;
        int maxBytes = Integer.MAX_VALUE;
        int maxWaitMs = 0;
        int fetchSize = 1000;
        MetricConfig metricConfig = new MetricConfig();
        LogContext logContext = new LogContext();
        SubscriptionState subscriptionState = new SubscriptionState(logContext, offsetResetStrategy);
        this.buildFetcher(metricConfig, isolationLevel, metadataExpireMs, subscriptionState, logContext);
        FetchMetricsRegistry metricsRegistry = new FetchMetricsRegistry(metricConfig.tags().keySet(), "consumertest-group");
        FetchConfig fetchConfig = new FetchConfig(minBytes, maxBytes, maxWaitMs, fetchSize, maxPollRecords, true, "", isolationLevel);
        Fetcher fetcher = new Fetcher(logContext, this.consumerClient, this.metadata, this.subscriptions, fetchConfig, new Deserializers((Deserializer)new ByteArrayDeserializer(), (Deserializer)new ByteArrayDeserializer(), this.metrics), new FetchMetricsManager(this.metrics, metricsRegistry), (Time)this.time, this.apiVersions);
        this.assignFromUser(Collections.singleton(this.tp0));
        HashMap<String, Integer> partitionCounts = new HashMap<String, Integer>();
        partitionCounts.put(this.tp0.topic(), 4);
        boolean epochOne = true;
        int epochTwo = 2;
        this.metadata.updateWithCurrentRequestVersion(RequestTestUtils.metadataUpdateWithIds("dummy", 1, Collections.emptyMap(), partitionCounts, tp -> 1, this.topicIds), false, 0L);
        Node node = (Node)this.metadata.fetch().nodes().get(0);
        this.apiVersions.update(node.idString(), NodeApiVersions.create((short)ApiKeys.OFFSET_FOR_LEADER_EPOCH.id, (short)0, (short)2));
        Metadata.LeaderAndEpoch leaderAndEpoch = new Metadata.LeaderAndEpoch(this.metadata.currentLeader((TopicPartition)this.tp0).leader, Optional.of(1));
        this.subscriptions.seekUnvalidated(this.tp0, new SubscriptionState.FetchPosition(0L, Optional.of(1), leaderAndEpoch));
        this.metadata.updateWithCurrentRequestVersion(RequestTestUtils.metadataUpdateWithIds("dummy", 1, Collections.emptyMap(), partitionCounts, tp -> 2, this.topicIds), false, 0L);
        this.offsetFetcher.validatePositionsIfNeeded();
        Assertions.assertFalse((boolean)this.subscriptions.awaitingValidation(this.tp0));
        leaderAndEpoch = new Metadata.LeaderAndEpoch(this.metadata.currentLeader((TopicPartition)this.tp0).leader, Optional.of(1));
        this.subscriptions.seekUnvalidated(this.tp0, new SubscriptionState.FetchPosition(0L, Optional.of(1), leaderAndEpoch));
        this.metadata.updateWithCurrentRequestVersion(RequestTestUtils.metadataUpdateWithIds("dummy", 1, Collections.emptyMap(), partitionCounts, tp -> 2, this.topicIds), false, 0L);
        this.offsetFetcher.validatePositionsOnMetadataChange();
        Assertions.assertEquals((int)1, (int)fetcher.sendFetches());
        Assertions.assertFalse((boolean)this.subscriptions.awaitingValidation(this.tp0));
    }

    @Test
    public void testOffsetValidationSkippedForOldResponse() {
        this.buildFetcher();
        this.assignFromUser(Collections.singleton(this.tp0));
        HashMap<String, Integer> partitionCounts = new HashMap<String, Integer>();
        partitionCounts.put(this.tp0.topic(), 4);
        boolean epochOne = true;
        this.metadata.updateWithCurrentRequestVersion(RequestTestUtils.metadataUpdateWithIds("dummy", 1, Collections.emptyMap(), partitionCounts, tp -> 1, this.topicIds), false, 0L);
        Node node = (Node)this.metadata.fetch().nodes().get(0);
        Assertions.assertFalse((boolean)this.client.isConnected(node.idString()));
        Metadata.LeaderAndEpoch leaderAndEpoch = new Metadata.LeaderAndEpoch(this.metadata.currentLeader((TopicPartition)this.tp0).leader, Optional.of(1));
        this.subscriptions.seekUnvalidated(this.tp0, new SubscriptionState.FetchPosition(20L, Optional.of(1), leaderAndEpoch));
        Assertions.assertFalse((boolean)this.client.isConnected(node.idString()));
        Assertions.assertTrue((boolean)this.subscriptions.awaitingValidation(this.tp0));
        int responseVersion = 8;
        this.metadata.updateWithCurrentRequestVersion(RequestTestUtils.metadataUpdateWith("dummy", 1, Collections.emptyMap(), partitionCounts, tp -> null, MetadataResponse.PartitionMetadata::new, (short)8, this.topicIds), false, 0L);
        this.offsetFetcher.validatePositionsIfNeeded();
        Assertions.assertFalse((boolean)this.subscriptions.awaitingValidation(this.tp0));
    }

    @Test
    public void testOffsetValidationresetPositionForUndefinedEpochWithDefinedResetPolicy() {
        this.testOffsetValidationWithGivenEpochOffset(-1, 0L, AutoOffsetResetStrategy.EARLIEST);
    }

    @Test
    public void testOffsetValidationresetPositionForUndefinedOffsetWithDefinedResetPolicy() {
        this.testOffsetValidationWithGivenEpochOffset(2, -1L, AutoOffsetResetStrategy.EARLIEST);
    }

    @Test
    public void testOffsetValidationresetPositionForUndefinedEpochWithUndefinedResetPolicy() {
        this.testOffsetValidationWithGivenEpochOffset(-1, 0L, AutoOffsetResetStrategy.NONE);
    }

    @Test
    public void testOffsetValidationresetPositionForUndefinedOffsetWithUndefinedResetPolicy() {
        this.testOffsetValidationWithGivenEpochOffset(2, -1L, AutoOffsetResetStrategy.NONE);
    }

    @Test
    public void testOffsetValidationTriggerLogTruncationForBadOffsetWithUndefinedResetPolicy() {
        this.testOffsetValidationWithGivenEpochOffset(1, 1L, AutoOffsetResetStrategy.NONE);
    }

    private void testOffsetValidationWithGivenEpochOffset(int leaderEpoch, long endOffset, AutoOffsetResetStrategy offsetResetStrategy) {
        this.buildFetcher(offsetResetStrategy);
        this.assignFromUser(Collections.singleton(this.tp0));
        HashMap<String, Integer> partitionCounts = new HashMap<String, Integer>();
        partitionCounts.put(this.tp0.topic(), 4);
        boolean epochOne = true;
        long initialOffset = 5L;
        this.metadata.updateWithCurrentRequestVersion(RequestTestUtils.metadataUpdateWithIds("dummy", 1, Collections.emptyMap(), partitionCounts, tp -> 1, this.topicIds), false, 0L);
        Node node = (Node)this.metadata.fetch().nodes().get(0);
        this.apiVersions.update(node.idString(), NodeApiVersions.create());
        Metadata.LeaderAndEpoch leaderAndEpoch = new Metadata.LeaderAndEpoch(this.metadata.currentLeader((TopicPartition)this.tp0).leader, Optional.of(1));
        this.subscriptions.seekUnvalidated(this.tp0, new SubscriptionState.FetchPosition(5L, Optional.of(1), leaderAndEpoch));
        this.offsetFetcher.validatePositionsIfNeeded();
        this.consumerClient.poll(this.time.timer(Duration.ZERO));
        Assertions.assertTrue((boolean)this.subscriptions.awaitingValidation(this.tp0));
        Assertions.assertTrue((boolean)this.client.hasInFlightRequests());
        this.client.respond(this.offsetsForLeaderEpochRequestMatcher(this.tp0), (AbstractResponse)this.prepareOffsetsForLeaderEpochResponse(this.tp0, leaderEpoch, endOffset));
        this.consumerClient.poll(this.time.timer(Duration.ZERO));
        if (offsetResetStrategy == AutoOffsetResetStrategy.NONE) {
            LogTruncationException thrown = (LogTruncationException)Assertions.assertThrows(LogTruncationException.class, () -> this.offsetFetcher.validatePositionsIfNeeded());
            Assertions.assertEquals(Collections.singletonMap(this.tp0, 5L), (Object)thrown.offsetOutOfRangePartitions());
            if (endOffset == -1L || leaderEpoch == -1) {
                Assertions.assertEquals(Collections.emptyMap(), (Object)thrown.divergentOffsets());
            } else {
                OffsetAndMetadata expectedDivergentOffset = new OffsetAndMetadata(endOffset, Optional.of(leaderEpoch), "");
                Assertions.assertEquals(Collections.singletonMap(this.tp0, expectedDivergentOffset), (Object)thrown.divergentOffsets());
            }
            Assertions.assertTrue((boolean)this.subscriptions.awaitingValidation(this.tp0));
        } else {
            this.offsetFetcher.validatePositionsIfNeeded();
            Assertions.assertFalse((boolean)this.subscriptions.awaitingValidation(this.tp0));
        }
    }

    @Test
    public void testOffsetValidationHandlesSeekWithInflightOffsetForLeaderRequest() {
        this.buildFetcher();
        this.assignFromUser(Collections.singleton(this.tp0));
        HashMap<String, Integer> partitionCounts = new HashMap<String, Integer>();
        partitionCounts.put(this.tp0.topic(), 4);
        boolean epochOne = true;
        Optional<Integer> epochOneOpt = Optional.of(1);
        this.metadata.updateWithCurrentRequestVersion(RequestTestUtils.metadataUpdateWithIds("dummy", 1, Collections.emptyMap(), partitionCounts, tp -> 1, this.topicIds), false, 0L);
        Node node = (Node)this.metadata.fetch().nodes().get(0);
        this.apiVersions.update(node.idString(), NodeApiVersions.create());
        Metadata.LeaderAndEpoch leaderAndEpoch = new Metadata.LeaderAndEpoch(this.metadata.currentLeader((TopicPartition)this.tp0).leader, epochOneOpt);
        this.subscriptions.seekUnvalidated(this.tp0, new SubscriptionState.FetchPosition(0L, epochOneOpt, leaderAndEpoch));
        this.offsetFetcher.validatePositionsIfNeeded();
        this.consumerClient.poll(this.time.timer(Duration.ZERO));
        Assertions.assertTrue((boolean)this.subscriptions.awaitingValidation(this.tp0));
        Assertions.assertTrue((boolean)this.client.hasInFlightRequests());
        this.subscriptions.seekUnvalidated(this.tp0, new SubscriptionState.FetchPosition(5L, epochOneOpt, leaderAndEpoch));
        Assertions.assertTrue((boolean)this.subscriptions.awaitingValidation(this.tp0));
        this.client.respond(this.offsetsForLeaderEpochRequestMatcher(this.tp0), (AbstractResponse)this.prepareOffsetsForLeaderEpochResponse(this.tp0, 0, 0L));
        this.consumerClient.poll(this.time.timer(Duration.ZERO));
        Assertions.assertTrue((boolean)this.subscriptions.awaitingValidation(this.tp0));
    }

    @Test
    public void testOffsetValidationFencing() {
        this.buildFetcher();
        this.assignFromUser(Collections.singleton(this.tp0));
        HashMap<String, Integer> partitionCounts = new HashMap<String, Integer>();
        partitionCounts.put(this.tp0.topic(), 4);
        boolean epochOne = true;
        int epochTwo = 2;
        int epochThree = 3;
        this.metadata.updateWithCurrentRequestVersion(RequestTestUtils.metadataUpdateWithIds("dummy", 1, Collections.emptyMap(), partitionCounts, tp -> 1, this.topicIds), false, 0L);
        Node node = (Node)this.metadata.fetch().nodes().get(0);
        this.apiVersions.update(node.idString(), NodeApiVersions.create());
        Metadata.LeaderAndEpoch leaderAndEpoch = new Metadata.LeaderAndEpoch(this.metadata.currentLeader((TopicPartition)this.tp0).leader, Optional.of(1));
        this.subscriptions.seekValidated(this.tp0, new SubscriptionState.FetchPosition(0L, Optional.of(1), leaderAndEpoch));
        this.metadata.updateWithCurrentRequestVersion(RequestTestUtils.metadataUpdateWithIds("dummy", 1, Collections.emptyMap(), partitionCounts, tp -> 2, this.topicIds), false, 0L);
        this.offsetFetcher.validatePositionsIfNeeded();
        Assertions.assertTrue((boolean)this.subscriptions.awaitingValidation(this.tp0));
        this.subscriptions.completeValidation(this.tp0);
        SubscriptionState.FetchPosition nextPosition = new SubscriptionState.FetchPosition(10L, Optional.of(2), new Metadata.LeaderAndEpoch(leaderAndEpoch.leader, Optional.of(2)));
        this.subscriptions.position(this.tp0, nextPosition);
        this.subscriptions.maybeValidatePositionForCurrentLeader(this.apiVersions, this.tp0, new Metadata.LeaderAndEpoch(leaderAndEpoch.leader, Optional.of(3)));
        this.client.prepareResponse((AbstractResponse)this.prepareOffsetsForLeaderEpochResponse(this.tp0, 2, 10L));
        this.consumerClient.pollNoWakeup();
        Assertions.assertTrue((boolean)this.subscriptions.awaitingValidation(this.tp0), (String)"Expected validation to fail since leader epoch changed");
        this.offsetFetcher.validatePositionsIfNeeded();
        this.client.prepareResponse((AbstractResponse)this.prepareOffsetsForLeaderEpochResponse(this.tp0, 3, 10L));
        this.consumerClient.pollNoWakeup();
        Assertions.assertFalse((boolean)this.subscriptions.awaitingValidation(this.tp0), (String)"Expected validation to succeed with latest epoch");
    }

    @Test
    public void testBeginningOffsets() {
        this.buildFetcher();
        this.assignFromUser(Collections.singleton(this.tp0));
        this.client.prepareResponse((AbstractResponse)this.listOffsetResponse(this.tp0, Errors.NONE, -2L, 2L));
        Assertions.assertEquals(Collections.singletonMap(this.tp0, 2L), (Object)this.offsetFetcher.beginningOffsets(Collections.singleton(this.tp0), this.time.timer(5000L)));
    }

    @Test
    public void testBeginningOffsetsDuplicateTopicPartition() {
        this.buildFetcher();
        this.assignFromUser(Collections.singleton(this.tp0));
        this.client.prepareResponse((AbstractResponse)this.listOffsetResponse(this.tp0, Errors.NONE, -2L, 2L));
        Assertions.assertEquals(Collections.singletonMap(this.tp0, 2L), (Object)this.offsetFetcher.beginningOffsets(Arrays.asList(this.tp0, this.tp0), this.time.timer(5000L)));
    }

    @Test
    public void testBeginningOffsetsMultipleTopicPartitions() {
        this.buildFetcher();
        HashMap<TopicPartition, Long> expectedOffsets = new HashMap<TopicPartition, Long>();
        expectedOffsets.put(this.tp0, 2L);
        expectedOffsets.put(this.tp1, 4L);
        expectedOffsets.put(this.tp2, 6L);
        this.assignFromUser(expectedOffsets.keySet());
        this.client.prepareResponse((AbstractResponse)this.listOffsetResponse(expectedOffsets, Errors.NONE, -2L, -1));
        Assertions.assertEquals(expectedOffsets, (Object)this.offsetFetcher.beginningOffsets(Arrays.asList(this.tp0, this.tp1, this.tp2), this.time.timer(5000L)));
    }

    @Test
    public void testBeginningOffsetsEmpty() {
        this.buildFetcher();
        Assertions.assertEquals(Collections.emptyMap(), (Object)this.offsetFetcher.beginningOffsets(Collections.emptyList(), this.time.timer(5000L)));
    }

    @Test
    public void testEndOffsets() {
        this.buildFetcher();
        this.assignFromUser(Collections.singleton(this.tp0));
        this.client.prepareResponse((AbstractResponse)this.listOffsetResponse(this.tp0, Errors.NONE, -1L, 5L));
        Assertions.assertEquals(Collections.singletonMap(this.tp0, 5L), (Object)this.offsetFetcher.endOffsets(Collections.singleton(this.tp0), this.time.timer(5000L)));
    }

    @Test
    public void testEndOffsetsDuplicateTopicPartition() {
        this.buildFetcher();
        this.assignFromUser(Collections.singleton(this.tp0));
        this.client.prepareResponse((AbstractResponse)this.listOffsetResponse(this.tp0, Errors.NONE, -1L, 5L));
        Assertions.assertEquals(Collections.singletonMap(this.tp0, 5L), (Object)this.offsetFetcher.endOffsets(Arrays.asList(this.tp0, this.tp0), this.time.timer(5000L)));
    }

    @Test
    public void testEndOffsetsMultipleTopicPartitions() {
        this.buildFetcher();
        HashMap<TopicPartition, Long> expectedOffsets = new HashMap<TopicPartition, Long>();
        expectedOffsets.put(this.tp0, 5L);
        expectedOffsets.put(this.tp1, 7L);
        expectedOffsets.put(this.tp2, 9L);
        this.assignFromUser(expectedOffsets.keySet());
        this.client.prepareResponse((AbstractResponse)this.listOffsetResponse(expectedOffsets, Errors.NONE, -1L, -1));
        Assertions.assertEquals(expectedOffsets, (Object)this.offsetFetcher.endOffsets(Arrays.asList(this.tp0, this.tp1, this.tp2), this.time.timer(5000L)));
    }

    @Test
    public void testEndOffsetsEmpty() {
        this.buildFetcher();
        Assertions.assertEquals(Collections.emptyMap(), (Object)this.offsetFetcher.endOffsets(Collections.emptyList(), this.time.timer(5000L)));
    }

    private MockClient.RequestMatcher offsetsForLeaderEpochRequestMatcher(TopicPartition topicPartition) {
        int currentLeaderEpoch = 1;
        int leaderEpoch = 1;
        return request -> {
            OffsetsForLeaderEpochRequest epochRequest = (OffsetsForLeaderEpochRequest)request;
            OffsetForLeaderEpochRequestData.OffsetForLeaderPartition partition = this.offsetForLeaderPartitionMap(epochRequest.data()).get(topicPartition);
            return partition != null && partition.currentLeaderEpoch() == currentLeaderEpoch && partition.leaderEpoch() == leaderEpoch;
        };
    }

    private OffsetsForLeaderEpochResponse prepareOffsetsForLeaderEpochResponse(TopicPartition topicPartition, int leaderEpoch, long endOffset) {
        OffsetForLeaderEpochResponseData data = new OffsetForLeaderEpochResponseData();
        data.topics().add((ImplicitLinkedHashCollection.Element)new OffsetForLeaderEpochResponseData.OffsetForLeaderTopicResult().setTopic(topicPartition.topic()).setPartitions(Collections.singletonList(new OffsetForLeaderEpochResponseData.EpochEndOffset().setPartition(topicPartition.partition()).setErrorCode(Errors.NONE.code()).setLeaderEpoch(leaderEpoch).setEndOffset(endOffset))));
        return new OffsetsForLeaderEpochResponse(data);
    }

    private Map<TopicPartition, OffsetForLeaderEpochRequestData.OffsetForLeaderPartition> offsetForLeaderPartitionMap(OffsetForLeaderEpochRequestData data) {
        HashMap<TopicPartition, OffsetForLeaderEpochRequestData.OffsetForLeaderPartition> result = new HashMap<TopicPartition, OffsetForLeaderEpochRequestData.OffsetForLeaderPartition>();
        data.topics().forEach(topic -> topic.partitions().forEach(partition -> result.put(new TopicPartition(topic.topic(), partition.partition()), (OffsetForLeaderEpochRequestData.OffsetForLeaderPartition)partition)));
        return result;
    }

    private MockClient.RequestMatcher listOffsetRequestMatcher(long timestamp) {
        return this.listOffsetRequestMatcher(timestamp, -1);
    }

    private MockClient.RequestMatcher listOffsetRequestMatcher(long timestamp, int leaderEpoch) {
        return body -> {
            ListOffsetsRequest req = (ListOffsetsRequest)body;
            ListOffsetsRequestData.ListOffsetsTopic topic = (ListOffsetsRequestData.ListOffsetsTopic)req.topics().get(0);
            ListOffsetsRequestData.ListOffsetsPartition partition = (ListOffsetsRequestData.ListOffsetsPartition)topic.partitions().get(0);
            Assertions.assertEquals((int)30000, (int)req.timeoutMs());
            return this.tp0.topic().equals(topic.name()) && this.tp0.partition() == partition.partitionIndex() && timestamp == partition.timestamp() && leaderEpoch == partition.currentLeaderEpoch();
        };
    }

    private ListOffsetsResponse listOffsetResponse(Errors error, long timestamp, long offset) {
        return this.listOffsetResponse(this.tp0, error, timestamp, offset);
    }

    private ListOffsetsResponse listOffsetResponse(TopicPartition tp, Errors error, long timestamp, long offset) {
        return this.listOffsetResponse(tp, error, timestamp, offset, -1);
    }

    private ListOffsetsResponse listOffsetResponse(TopicPartition tp, Errors error, long timestamp, long offset, int leaderEpoch) {
        HashMap<TopicPartition, Long> offsets = new HashMap<TopicPartition, Long>();
        offsets.put(tp, offset);
        return this.listOffsetResponse(offsets, error, timestamp, leaderEpoch);
    }

    private ListOffsetsResponse listOffsetResponse(Map<TopicPartition, Long> offsets, Errors error, long timestamp, int leaderEpoch) {
        HashMap responses = new HashMap();
        for (Map.Entry<TopicPartition, Long> entry : offsets.entrySet()) {
            TopicPartition topicPartition = entry.getKey();
            responses.putIfAbsent(topicPartition.topic(), new ArrayList());
            ((List)responses.get(topicPartition.topic())).add(new ListOffsetsResponseData.ListOffsetsPartitionResponse().setPartitionIndex(topicPartition.partition()).setErrorCode(error.code()).setOffset(entry.getValue().longValue()).setTimestamp(timestamp).setLeaderEpoch(leaderEpoch));
        }
        ArrayList<ListOffsetsResponseData.ListOffsetsTopicResponse> topics = new ArrayList<ListOffsetsResponseData.ListOffsetsTopicResponse>();
        for (Map.Entry entry : responses.entrySet()) {
            topics.add(new ListOffsetsResponseData.ListOffsetsTopicResponse().setName((String)entry.getKey()).setPartitions((List)entry.getValue()));
        }
        ListOffsetsResponseData listOffsetsResponseData = new ListOffsetsResponseData().setTopics(topics);
        return new ListOffsetsResponse(listOffsetsResponseData);
    }

    private void buildFetcher() {
        this.buildFetcher(IsolationLevel.READ_UNCOMMITTED);
    }

    private void buildFetcher(AutoOffsetResetStrategy offsetResetStrategy) {
        this.buildFetcher(new MetricConfig(), offsetResetStrategy, IsolationLevel.READ_UNCOMMITTED);
    }

    private void buildFetcher(IsolationLevel isolationLevel) {
        this.buildFetcher(new MetricConfig(), AutoOffsetResetStrategy.EARLIEST, isolationLevel);
    }

    private void buildFetcher(MetricConfig metricConfig, AutoOffsetResetStrategy offsetResetStrategy, IsolationLevel isolationLevel) {
        long metadataExpireMs = Long.MAX_VALUE;
        LogContext logContext = new LogContext();
        SubscriptionState subscriptionState = new SubscriptionState(logContext, offsetResetStrategy);
        this.buildFetcher(metricConfig, isolationLevel, metadataExpireMs, subscriptionState, logContext);
    }

    private void buildFetcher(MetricConfig metricConfig, IsolationLevel isolationLevel, long metadataExpireMs, SubscriptionState subscriptionState, LogContext logContext) {
        this.buildDependencies(metricConfig, metadataExpireMs, subscriptionState, logContext);
        this.offsetFetcher = new OffsetFetcher(logContext, this.consumerClient, this.metadata, this.subscriptions, (Time)this.time, 100L, 30000, isolationLevel, this.apiVersions);
    }

    private void buildDependencies(MetricConfig metricConfig, long metadataExpireMs, SubscriptionState subscriptionState, LogContext logContext) {
        this.time = new MockTime(1L);
        this.subscriptions = subscriptionState;
        this.metadata = new ConsumerMetadata(0L, 0L, metadataExpireMs, false, false, this.subscriptions, logContext, new ClusterResourceListeners());
        this.client = new MockClient((Time)this.time, (Metadata)this.metadata);
        this.metrics = new Metrics(metricConfig, (Time)this.time);
        this.consumerClient = new ConsumerNetworkClient(logContext, (KafkaClient)this.client, (Metadata)this.metadata, (Time)this.time, 100L, 1000, Integer.MAX_VALUE);
    }
}

