package org.apache.kafka.clients.consumer;

import java.lang.management.ManagementFactory;
import java.nio.ByteBuffer;
import java.time.Duration;
import java.util.AbstractMap;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.ConcurrentModificationException;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Optional;
import java.util.OptionalLong;
import java.util.Properties;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import javax.management.MBeanServer;
import javax.management.ObjectName;
import org.apache.kafka.clients.ClientRequest;
import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.KafkaClient;
import org.apache.kafka.clients.Metadata;
import org.apache.kafka.clients.MockClient;
import org.apache.kafka.clients.NodeApiVersions;
import org.apache.kafka.clients.consumer.ConsumerPartitionAssignor;
import org.apache.kafka.clients.consumer.ConsumerPartitionAssignorTest;
import org.apache.kafka.clients.consumer.internals.ConsumerMetadata;
import org.apache.kafka.clients.consumer.internals.ConsumerProtocol;
import org.apache.kafka.clients.consumer.internals.MockRebalanceListener;
import org.apache.kafka.clients.consumer.internals.SubscriptionState;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.IsolationLevel;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.Metric;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.TopicIdPartition;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.config.AbstractConfig;
import org.apache.kafka.common.config.AbstractConfigTest;
import org.apache.kafka.common.errors.AuthenticationException;
import org.apache.kafka.common.errors.InterruptException;
import org.apache.kafka.common.errors.InvalidConfigurationException;
import org.apache.kafka.common.errors.InvalidGroupIdException;
import org.apache.kafka.common.errors.InvalidTopicException;
import org.apache.kafka.common.errors.RecordDeserializationException;
import org.apache.kafka.common.errors.SerializationException;
import org.apache.kafka.common.errors.UnsupportedVersionException;
import org.apache.kafka.common.errors.WakeupException;
import org.apache.kafka.common.header.Headers;
import org.apache.kafka.common.internals.ClusterResourceListeners;
import org.apache.kafka.common.message.FetchRequestData;
import org.apache.kafka.common.message.FetchResponseData;
import org.apache.kafka.common.message.HeartbeatResponseData;
import org.apache.kafka.common.message.JoinGroupRequestData;
import org.apache.kafka.common.message.JoinGroupResponseData;
import org.apache.kafka.common.message.LeaveGroupResponseData;
import org.apache.kafka.common.message.ListOffsetsRequestData;
import org.apache.kafka.common.message.ListOffsetsResponseData;
import org.apache.kafka.common.message.SyncGroupResponseData;
import org.apache.kafka.common.metrics.JmxReporter;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.metrics.stats.Avg;
import org.apache.kafka.common.network.KafkaChannelTest;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.record.CompressionType;
import org.apache.kafka.common.record.MemoryRecords;
import org.apache.kafka.common.record.MemoryRecordsBuilder;
import org.apache.kafka.common.record.TimestampType;
import org.apache.kafka.common.requests.AbstractResponse;
import org.apache.kafka.common.requests.FetchRequest;
import org.apache.kafka.common.requests.FetchResponse;
import org.apache.kafka.common.requests.FindCoordinatorResponse;
import org.apache.kafka.common.requests.HeartbeatResponse;
import org.apache.kafka.common.requests.JoinGroupRequest;
import org.apache.kafka.common.requests.JoinGroupResponse;
import org.apache.kafka.common.requests.LeaveGroupResponse;
import org.apache.kafka.common.requests.ListOffsetsRequest;
import org.apache.kafka.common.requests.ListOffsetsResponse;
import org.apache.kafka.common.requests.MetadataResponse;
import org.apache.kafka.common.requests.OffsetCommitRequest;
import org.apache.kafka.common.requests.OffsetCommitResponse;
import org.apache.kafka.common.requests.OffsetFetchResponse;
import org.apache.kafka.common.requests.RequestTestUtils;
import org.apache.kafka.common.requests.SyncGroupResponse;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter;
import org.apache.kafka.common.telemetry.internals.ClientTelemetrySender;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.test.MockConsumerInterceptor;
import org.apache.kafka.test.MockMetricsReporter;
import org.apache.kafka.test.TestUtils;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.EnumSource;
import org.mockito.ArgumentMatchers;
import org.mockito.MockedStatic;
import org.mockito.Mockito;
import org.mockito.internal.stubbing.answers.CallsRealMethods;

/* loaded from: input_file:org/apache/kafka/clients/consumer/KafkaConsumerTest.class */
public class KafkaConsumerTest {
    private final String topic = "test";
    private final Uuid topicId = Uuid.randomUuid();
    private final TopicPartition tp0 = new TopicPartition("test", 0);
    private final TopicPartition tp1 = new TopicPartition("test", 1);
    private final String topic2 = "test2";
    private final Uuid topicId2 = Uuid.randomUuid();
    private final TopicPartition t2p0 = new TopicPartition("test2", 0);
    private final String topic3 = "test3";
    private final Uuid topicId3 = Uuid.randomUuid();
    private final TopicPartition t3p0 = new TopicPartition("test3", 0);
    private final int sessionTimeoutMs = 10000;
    private final int defaultApiTimeoutMs = 60000;
    private final int heartbeatIntervalMs = 1000;
    private final int autoCommitIntervalMs = 500;
    private final String groupId = "mock-group";
    private final String memberId = "memberId";
    private final String leaderId = "leaderId";
    private final Optional<String> groupInstanceId = Optional.of("mock-instance");
    private Map<String, Uuid> topicIds = (Map) Stream.of((Object[]) new AbstractMap.SimpleEntry[]{new AbstractMap.SimpleEntry("test", this.topicId), new AbstractMap.SimpleEntry("test2", this.topicId2), new AbstractMap.SimpleEntry("test3", this.topicId3)}).collect(Collectors.toMap((v0) -> {
        return v0.getKey();
    }, (v0) -> {
        return v0.getValue();
    }));
    private Map<Uuid, String> topicNames = (Map) Stream.of((Object[]) new AbstractMap.SimpleEntry[]{new AbstractMap.SimpleEntry(this.topicId, "test"), new AbstractMap.SimpleEntry(this.topicId2, "test2"), new AbstractMap.SimpleEntry(this.topicId3, "test3")}).collect(Collectors.toMap((v0) -> {
        return v0.getKey();
    }, (v0) -> {
        return v0.getValue();
    }));
    private final String partitionRevoked = "Hit partition revoke ";
    private final String partitionAssigned = "Hit partition assign ";
    private final String partitionLost = "Hit partition lost ";
    private final Collection<TopicPartition> singleTopicPartition = Collections.singleton(new TopicPartition("test", 0));
    private final Time time = new MockTime();
    private final SubscriptionState subscription = new SubscriptionState(new LogContext(), OffsetResetStrategy.EARLIEST);
    private final ConsumerPartitionAssignor assignor = new RoundRobinAssignor();
    private KafkaConsumer<?, ?> consumer;
    private static final List<String> CLIENT_IDS = new ArrayList();

    /* loaded from: input_file:org/apache/kafka/clients/consumer/KafkaConsumerTest$ConsumerInterceptorForClientId.class */
    public static class ConsumerInterceptorForClientId implements ConsumerInterceptor<byte[], byte[]> {
        public ConsumerRecords<byte[], byte[]> onConsume(ConsumerRecords<byte[], byte[]> consumerRecords) {
            return consumerRecords;
        }

        public void onCommit(Map<TopicPartition, OffsetAndMetadata> map) {
        }

        public void close() {
        }

        public void configure(Map<String, ?> map) {
            KafkaConsumerTest.CLIENT_IDS.add(map.get("client.id").toString());
        }
    }

    /* loaded from: input_file:org/apache/kafka/clients/consumer/KafkaConsumerTest$DeserializerForClientId.class */
    public static class DeserializerForClientId implements Deserializer<byte[]> {
        public void configure(Map<String, ?> map, boolean z) {
            KafkaConsumerTest.CLIENT_IDS.add(map.get("client.id").toString());
        }

        /* renamed from: deserialize, reason: merged with bridge method [inline-methods] */
        public byte[] m15deserialize(String str, byte[] bArr) {
            return bArr;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/kafka/clients/consumer/KafkaConsumerTest$FetchInfo.class */
    public static class FetchInfo {
        long logFirstOffset;
        long logLastOffset;
        long offset;
        int count;

        FetchInfo(long j, int i) {
            this(0L, j + i, j, i);
        }

        FetchInfo(long j, long j2, long j3, int i) {
            this.logFirstOffset = j;
            this.logLastOffset = j2;
            this.offset = j3;
            this.count = i;
        }
    }

    @AfterEach
    public void cleanup() {
        if (this.consumer != null) {
            this.consumer.close(Duration.ZERO);
        }
    }

    @EnumSource(GroupProtocol.class)
    @ParameterizedTest
    public void testMetricsReporterAutoGeneratedClientId(GroupProtocol groupProtocol) {
        Properties properties = new Properties();
        properties.setProperty("group.protocol", groupProtocol.name());
        properties.setProperty("bootstrap.servers", "localhost:9999");
        properties.setProperty(AbstractConfigTest.TestConfig.METRIC_REPORTER_CLASSES_CONFIG, MockMetricsReporter.class.getName());
        this.consumer = newConsumer(properties, (Deserializer) new StringDeserializer(), (Deserializer) new StringDeserializer());
        Assertions.assertEquals(3, this.consumer.metricsRegistry().reporters().size());
        Assertions.assertEquals(this.consumer.clientId(), ((MockMetricsReporter) this.consumer.metricsRegistry().reporters().stream().filter(metricsReporter -> {
            return metricsReporter instanceof MockMetricsReporter;
        }).findFirst().get()).clientId);
    }

    @EnumSource(GroupProtocol.class)
    @ParameterizedTest
    public void testDisableJmxAndClientTelemetryReporter(GroupProtocol groupProtocol) {
        Properties properties = new Properties();
        properties.setProperty("group.protocol", groupProtocol.name());
        properties.setProperty("bootstrap.servers", "localhost:9999");
        properties.setProperty("auto.include.jmx.reporter", "false");
        properties.setProperty("enable.metrics.push", "false");
        this.consumer = newConsumer(properties, (Deserializer) new StringDeserializer(), (Deserializer) new StringDeserializer());
        Assertions.assertTrue(this.consumer.metricsRegistry().reporters().isEmpty());
    }

    @EnumSource(GroupProtocol.class)
    @ParameterizedTest
    public void testExplicitlyOnlyEnableJmxReporter(GroupProtocol groupProtocol) {
        Properties properties = new Properties();
        properties.setProperty("group.protocol", groupProtocol.name());
        properties.setProperty("bootstrap.servers", "localhost:9999");
        properties.setProperty(AbstractConfigTest.TestConfig.METRIC_REPORTER_CLASSES_CONFIG, "org.apache.kafka.common.metrics.JmxReporter");
        properties.setProperty("enable.metrics.push", "false");
        this.consumer = newConsumer(properties, (Deserializer) new StringDeserializer(), (Deserializer) new StringDeserializer());
        Assertions.assertEquals(1, this.consumer.metricsRegistry().reporters().size());
        Assertions.assertTrue(this.consumer.metricsRegistry().reporters().get(0) instanceof JmxReporter);
    }

    @EnumSource(GroupProtocol.class)
    @ParameterizedTest
    public void testExplicitlyOnlyEnableClientTelemetryReporter(GroupProtocol groupProtocol) {
        Properties properties = new Properties();
        properties.setProperty("group.protocol", groupProtocol.name());
        properties.setProperty("bootstrap.servers", "localhost:9999");
        properties.setProperty("auto.include.jmx.reporter", "false");
        this.consumer = newConsumer(properties, (Deserializer) new StringDeserializer(), (Deserializer) new StringDeserializer());
        Assertions.assertEquals(1, this.consumer.metricsRegistry().reporters().size());
        Assertions.assertTrue(this.consumer.metricsRegistry().reporters().get(0) instanceof ClientTelemetryReporter);
    }

    @EnumSource(value = GroupProtocol.class, names = {"CLASSIC"})
    @ParameterizedTest
    public void testPollReturnsRecords(GroupProtocol groupProtocol) {
        this.consumer = setUpConsumerWithRecordsToPoll(groupProtocol, this.tp0, 5);
        ConsumerRecords poll = this.consumer.poll(Duration.ZERO);
        Assertions.assertEquals(poll.count(), 5);
        Assertions.assertEquals(poll.partitions(), Collections.singleton(this.tp0));
        Assertions.assertEquals(poll.records(this.tp0).size(), 5);
    }

    @EnumSource(value = GroupProtocol.class, names = {"CLASSIC"})
    @ParameterizedTest
    public void testSecondPollWithDeserializationErrorThrowsRecordDeserializationException(GroupProtocol groupProtocol) {
        this.consumer = setUpConsumerWithRecordsToPoll(groupProtocol, this.tp0, 5, mockErrorDeserializer(4));
        ConsumerRecords poll = this.consumer.poll(Duration.ZERO);
        Assertions.assertEquals(4 - 1, poll.count());
        Assertions.assertEquals(Collections.singleton(this.tp0), poll.partitions());
        Assertions.assertEquals(4 - 1, poll.records(this.tp0).size());
        Assertions.assertEquals(4 - 2, ((ConsumerRecord) poll.records(this.tp0).get(poll.records(this.tp0).size() - 1)).offset());
        RecordDeserializationException assertThrows = Assertions.assertThrows(RecordDeserializationException.class, () -> {
            this.consumer.poll(Duration.ZERO);
        });
        Assertions.assertEquals(3, assertThrows.offset());
        Assertions.assertEquals(this.tp0, assertThrows.topicPartition());
        Assertions.assertEquals(assertThrows.offset(), this.consumer.position(this.tp0));
    }

    private StringDeserializer mockErrorDeserializer(int i) {
        final int i2 = i - 1;
        return new StringDeserializer() { // from class: org.apache.kafka.clients.consumer.KafkaConsumerTest.1
            int i = 0;

            /* renamed from: deserialize, reason: merged with bridge method [inline-methods] */
            public String m14deserialize(String str, byte[] bArr) {
                if (this.i == i2) {
                    throw new SerializationException();
                }
                this.i++;
                return super.deserialize(str, bArr);
            }

            /* renamed from: deserialize, reason: merged with bridge method [inline-methods] */
            public String m13deserialize(String str, Headers headers, ByteBuffer byteBuffer) {
                if (this.i == i2) {
                    throw new SerializationException();
                }
                this.i++;
                return super.deserialize(str, headers, byteBuffer);
            }
        };
    }

    private KafkaConsumer<?, ?> setUpConsumerWithRecordsToPoll(GroupProtocol groupProtocol, TopicPartition topicPartition, int i) {
        return setUpConsumerWithRecordsToPoll(groupProtocol, topicPartition, i, new StringDeserializer());
    }

    private KafkaConsumer<?, ?> setUpConsumerWithRecordsToPoll(GroupProtocol groupProtocol, TopicPartition topicPartition, int i, Deserializer<String> deserializer) {
        Node node = (Node) TestUtils.singletonCluster(topicPartition.topic(), 1).nodes().get(0);
        ConsumerMetadata createMetadata = createMetadata(this.subscription);
        MockClient mockClient = new MockClient(this.time, (Metadata) createMetadata);
        initMetadata(mockClient, Collections.singletonMap("test", 1));
        this.consumer = newConsumer(groupProtocol, this.time, mockClient, this.subscription, createMetadata, this.assignor, true, "mock-group", this.groupInstanceId, Optional.of(deserializer), false);
        this.consumer.subscribe(Collections.singleton("test"), getConsumerRebalanceListener(this.consumer));
        prepareRebalance(mockClient, node, this.assignor, Collections.singletonList(topicPartition), null);
        this.consumer.updateAssignmentMetadataIfNeeded(this.time.timer(Long.MAX_VALUE));
        mockClient.prepareResponseFrom(fetchResponse(topicPartition, 0L, i), node);
        return this.consumer;
    }

    @EnumSource(GroupProtocol.class)
    @ParameterizedTest
    public void testConstructorClose(GroupProtocol groupProtocol) {
        Properties properties = new Properties();
        properties.setProperty("group.protocol", groupProtocol.name());
        properties.setProperty("client.id", "testConstructorClose");
        properties.setProperty("bootstrap.servers", "invalid-23-8409-adsfsdj");
        properties.setProperty(AbstractConfigTest.TestConfig.METRIC_REPORTER_CLASSES_CONFIG, MockMetricsReporter.class.getName());
        int i = MockMetricsReporter.INIT_COUNT.get();
        int i2 = MockMetricsReporter.CLOSE_COUNT.get();
        try {
            newConsumer(properties, (Deserializer) new ByteArrayDeserializer(), (Deserializer) new ByteArrayDeserializer());
            Assertions.fail("should have caught an exception and returned");
        } catch (KafkaException e) {
            Assertions.assertEquals(i + 1, MockMetricsReporter.INIT_COUNT.get());
            Assertions.assertEquals(i2 + 1, MockMetricsReporter.CLOSE_COUNT.get());
            Assertions.assertEquals("Failed to construct kafka consumer", e.getMessage());
        }
    }

    @EnumSource(GroupProtocol.class)
    @ParameterizedTest
    public void testOsDefaultSocketBufferSizes(GroupProtocol groupProtocol) {
        HashMap hashMap = new HashMap();
        hashMap.put("group.protocol", groupProtocol.name());
        hashMap.put("bootstrap.servers", "localhost:9999");
        hashMap.put("send.buffer.bytes", -1);
        hashMap.put("receive.buffer.bytes", -1);
        this.consumer = newConsumer((Map<String, Object>) hashMap, (Deserializer) new ByteArrayDeserializer(), (Deserializer) new ByteArrayDeserializer());
    }

    @EnumSource(GroupProtocol.class)
    @ParameterizedTest
    public void testInvalidSocketSendBufferSize(GroupProtocol groupProtocol) {
        HashMap hashMap = new HashMap();
        hashMap.put("group.protocol", groupProtocol.name());
        hashMap.put("bootstrap.servers", "localhost:9999");
        hashMap.put("send.buffer.bytes", -2);
        Assertions.assertThrows(KafkaException.class, () -> {
            newConsumer((Map<String, Object>) hashMap, (Deserializer) new ByteArrayDeserializer(), (Deserializer) new ByteArrayDeserializer());
        });
    }

    @EnumSource(GroupProtocol.class)
    @ParameterizedTest
    public void testInvalidSocketReceiveBufferSize(GroupProtocol groupProtocol) {
        HashMap hashMap = new HashMap();
        hashMap.put("group.protocol", groupProtocol.name());
        hashMap.put("bootstrap.servers", "localhost:9999");
        hashMap.put("receive.buffer.bytes", -2);
        Assertions.assertThrows(KafkaException.class, () -> {
            newConsumer((Map<String, Object>) hashMap, (Deserializer) new ByteArrayDeserializer(), (Deserializer) new ByteArrayDeserializer());
        });
    }

    @EnumSource(GroupProtocol.class)
    @ParameterizedTest
    public void shouldIgnoreGroupInstanceIdForEmptyGroupId(GroupProtocol groupProtocol) {
        HashMap hashMap = new HashMap();
        hashMap.put("group.protocol", groupProtocol.name());
        hashMap.put("bootstrap.servers", "localhost:9999");
        hashMap.put("group.instance.id", "instance_id");
        this.consumer = newConsumer((Map<String, Object>) hashMap, (Deserializer) new ByteArrayDeserializer(), (Deserializer) new ByteArrayDeserializer());
    }

    @EnumSource(GroupProtocol.class)
    @ParameterizedTest
    public void testSubscription(GroupProtocol groupProtocol) {
        this.consumer = newConsumer(groupProtocol, "mock-group");
        this.consumer.subscribe(Collections.singletonList("test"));
        Assertions.assertEquals(Collections.singleton("test"), this.consumer.subscription());
        Assertions.assertTrue(this.consumer.assignment().isEmpty());
        this.consumer.subscribe(Collections.emptyList());
        Assertions.assertTrue(this.consumer.subscription().isEmpty());
        Assertions.assertTrue(this.consumer.assignment().isEmpty());
        this.consumer.assign(Collections.singletonList(this.tp0));
        Assertions.assertTrue(this.consumer.subscription().isEmpty());
        Assertions.assertEquals(Collections.singleton(this.tp0), this.consumer.assignment());
        this.consumer.unsubscribe();
        Assertions.assertTrue(this.consumer.subscription().isEmpty());
        Assertions.assertTrue(this.consumer.assignment().isEmpty());
    }

    @EnumSource(GroupProtocol.class)
    @ParameterizedTest
    public void testSubscriptionOnNullTopicCollection(GroupProtocol groupProtocol) {
        this.consumer = newConsumer(groupProtocol, "mock-group");
        Assertions.assertThrows(IllegalArgumentException.class, () -> {
            this.consumer.subscribe((List) null);
        });
    }

    @EnumSource(GroupProtocol.class)
    @ParameterizedTest
    public void testSubscriptionOnNullTopic(GroupProtocol groupProtocol) {
        this.consumer = newConsumer(groupProtocol, "mock-group");
        Assertions.assertThrows(IllegalArgumentException.class, () -> {
            this.consumer.subscribe(Collections.singletonList(null));
        });
    }

    @EnumSource(GroupProtocol.class)
    @ParameterizedTest
    public void testSubscriptionOnEmptyTopic(GroupProtocol groupProtocol) {
        this.consumer = newConsumer(groupProtocol, "mock-group");
        String str = "  ";
        Assertions.assertThrows(IllegalArgumentException.class, () -> {
            this.consumer.subscribe(Collections.singletonList(str));
        });
    }

    @EnumSource(GroupProtocol.class)
    @ParameterizedTest
    public void testSubscriptionOnNullPattern(GroupProtocol groupProtocol) {
        this.consumer = newConsumer(groupProtocol, "mock-group");
        Assertions.assertThrows(IllegalArgumentException.class, () -> {
            this.consumer.subscribe((Pattern) null);
        });
    }

    @EnumSource(GroupProtocol.class)
    @ParameterizedTest
    public void testSubscriptionOnEmptyPattern(GroupProtocol groupProtocol) {
        this.consumer = newConsumer(groupProtocol, "mock-group");
        Assertions.assertThrows(IllegalArgumentException.class, () -> {
            this.consumer.subscribe(Pattern.compile(""));
        });
    }

    @EnumSource(GroupProtocol.class)
    @ParameterizedTest
    public void testSubscriptionWithEmptyPartitionAssignment(GroupProtocol groupProtocol) {
        Properties properties = new Properties();
        properties.setProperty("group.protocol", groupProtocol.name());
        properties.setProperty("bootstrap.servers", "localhost:9999");
        properties.setProperty("partition.assignment.strategy", "");
        properties.setProperty("group.id", "mock-group");
        this.consumer = newConsumer(properties, (Deserializer) new ByteArrayDeserializer(), (Deserializer) new ByteArrayDeserializer());
        Assertions.assertThrows(IllegalStateException.class, () -> {
            this.consumer.subscribe(Collections.singletonList("test"));
        });
    }

    @EnumSource(GroupProtocol.class)
    @ParameterizedTest
    public void testSeekNegative(GroupProtocol groupProtocol) {
        this.consumer = newConsumer(groupProtocol, null);
        this.consumer.assign(Collections.singleton(new TopicPartition("nonExistTopic", 0)));
        Assertions.assertThrows(IllegalArgumentException.class, () -> {
            this.consumer.seek(new TopicPartition("nonExistTopic", 0), -1L);
        });
    }

    @EnumSource(GroupProtocol.class)
    @ParameterizedTest
    public void testAssignOnNullTopicPartition(GroupProtocol groupProtocol) {
        this.consumer = newConsumer(groupProtocol, null);
        Assertions.assertThrows(IllegalArgumentException.class, () -> {
            this.consumer.assign((Collection) null);
        });
    }

    @EnumSource(GroupProtocol.class)
    @ParameterizedTest
    public void testAssignOnEmptyTopicPartition(GroupProtocol groupProtocol) {
        this.consumer = newConsumer(groupProtocol, "mock-group");
        this.consumer.assign(Collections.emptyList());
        Assertions.assertTrue(this.consumer.subscription().isEmpty());
        Assertions.assertTrue(this.consumer.assignment().isEmpty());
    }

    @EnumSource(GroupProtocol.class)
    @ParameterizedTest
    public void testAssignOnNullTopicInPartition(GroupProtocol groupProtocol) {
        this.consumer = newConsumer(groupProtocol, null);
        Assertions.assertThrows(IllegalArgumentException.class, () -> {
            this.consumer.assign(Collections.singleton(new TopicPartition((String) null, 0)));
        });
    }

    @EnumSource(GroupProtocol.class)
    @ParameterizedTest
    public void testAssignOnEmptyTopicInPartition(GroupProtocol groupProtocol) {
        this.consumer = newConsumer(groupProtocol, null);
        Assertions.assertThrows(IllegalArgumentException.class, () -> {
            this.consumer.assign(Collections.singleton(new TopicPartition("  ", 0)));
        });
    }

    @EnumSource(GroupProtocol.class)
    @ParameterizedTest
    public void testInterceptorConstructorClose(GroupProtocol groupProtocol) {
        try {
            Properties properties = new Properties();
            properties.setProperty("group.protocol", groupProtocol.name());
            properties.setProperty("bootstrap.servers", "localhost:9999");
            properties.setProperty("interceptor.classes", MockConsumerInterceptor.class.getName());
            this.consumer = newConsumer(properties, (Deserializer) new StringDeserializer(), (Deserializer) new StringDeserializer());
            Assertions.assertEquals(1, MockConsumerInterceptor.INIT_COUNT.get());
            Assertions.assertEquals(0, MockConsumerInterceptor.CLOSE_COUNT.get());
            this.consumer.close(Duration.ZERO);
            Assertions.assertEquals(1, MockConsumerInterceptor.INIT_COUNT.get());
            Assertions.assertEquals(1, MockConsumerInterceptor.CLOSE_COUNT.get());
            Assertions.assertNull(MockConsumerInterceptor.CLUSTER_META.get());
        } finally {
            MockConsumerInterceptor.resetCounters();
        }
    }

    @EnumSource(GroupProtocol.class)
    @ParameterizedTest
    public void testInterceptorConstructorConfigurationWithExceptionShouldCloseRemainingInstances(GroupProtocol groupProtocol) {
        try {
            Properties properties = new Properties();
            properties.setProperty("group.protocol", groupProtocol.name());
            properties.setProperty("bootstrap.servers", "localhost:9999");
            properties.setProperty("interceptor.classes", MockConsumerInterceptor.class.getName() + ", " + MockConsumerInterceptor.class.getName() + ", " + MockConsumerInterceptor.class.getName());
            MockConsumerInterceptor.setThrowOnConfigExceptionThreshold(3);
            Assertions.assertThrows(KafkaException.class, () -> {
                newConsumer(properties, (Deserializer) new StringDeserializer(), (Deserializer) new StringDeserializer());
            });
            Assertions.assertEquals(3, MockConsumerInterceptor.CONFIG_COUNT.get());
            Assertions.assertEquals(3, MockConsumerInterceptor.CLOSE_COUNT.get());
        } finally {
            MockConsumerInterceptor.resetCounters();
        }
    }

    @EnumSource(GroupProtocol.class)
    @ParameterizedTest
    public void testPause(GroupProtocol groupProtocol) {
        this.consumer = newConsumer(groupProtocol, "mock-group");
        this.consumer.assign(Collections.singletonList(this.tp0));
        Assertions.assertEquals(Collections.singleton(this.tp0), this.consumer.assignment());
        Assertions.assertTrue(this.consumer.paused().isEmpty());
        this.consumer.pause(Collections.singleton(this.tp0));
        Assertions.assertEquals(Collections.singleton(this.tp0), this.consumer.paused());
        this.consumer.resume(Collections.singleton(this.tp0));
        Assertions.assertTrue(this.consumer.paused().isEmpty());
        this.consumer.unsubscribe();
        Assertions.assertTrue(this.consumer.paused().isEmpty());
    }

    @EnumSource(GroupProtocol.class)
    @ParameterizedTest
    public void testConsumerJmxPrefix(GroupProtocol groupProtocol) throws Exception {
        HashMap hashMap = new HashMap();
        hashMap.put("group.protocol", groupProtocol.name());
        hashMap.put("bootstrap.servers", "localhost:9999");
        hashMap.put("send.buffer.bytes", -1);
        hashMap.put("receive.buffer.bytes", -1);
        hashMap.put("client.id", "client-1");
        this.consumer = newConsumer((Map<String, Object>) hashMap, (Deserializer) new ByteArrayDeserializer(), (Deserializer) new ByteArrayDeserializer());
        MBeanServer platformMBeanServer = ManagementFactory.getPlatformMBeanServer();
        this.consumer.metricsRegistry().addMetric(this.consumer.metricsRegistry().metricName("test-metric", "grp1", "test metric"), new Avg());
        Assertions.assertNotNull(platformMBeanServer.getObjectInstance(new ObjectName("kafka.consumer:type=grp1,client-id=client-1")));
    }

    private KafkaConsumer<byte[], byte[]> newConsumer(GroupProtocol groupProtocol, String str) {
        return newConsumer(groupProtocol, str, Optional.empty());
    }

    private KafkaConsumer<byte[], byte[]> newConsumer(GroupProtocol groupProtocol, String str, Optional<Boolean> optional) {
        Properties properties = new Properties();
        properties.setProperty("group.protocol", groupProtocol.name());
        properties.setProperty("client.id", "my.consumer");
        properties.setProperty("bootstrap.servers", "localhost:9999");
        properties.setProperty(AbstractConfigTest.TestConfig.METRIC_REPORTER_CLASSES_CONFIG, MockMetricsReporter.class.getName());
        if (str != null) {
            properties.setProperty("group.id", str);
        }
        optional.ifPresent(bool -> {
            properties.setProperty("enable.auto.commit", bool.toString());
        });
        return newConsumer(properties, (Deserializer) new ByteArrayDeserializer(), (Deserializer) new ByteArrayDeserializer());
    }

    private <K, V> KafkaConsumer<K, V> newConsumer(Properties properties) {
        return newConsumer(properties, (Deserializer) null, (Deserializer) null);
    }

    private <K, V> KafkaConsumer<K, V> newConsumer(Map<String, Object> map, Deserializer<K> deserializer, Deserializer<V> deserializer2) {
        return new KafkaConsumer<>(new ConsumerConfig(ConsumerConfig.appendDeserializerToConfig(map, deserializer, deserializer2)), deserializer, deserializer2);
    }

    private <K, V> KafkaConsumer<K, V> newConsumer(Properties properties, Deserializer<K> deserializer, Deserializer<V> deserializer2) {
        return newConsumer(Utils.propsToMap(properties), deserializer, deserializer2);
    }

    @EnumSource(value = GroupProtocol.class, names = {"CLASSIC"})
    @ParameterizedTest
    public void verifyHeartbeatSent(GroupProtocol groupProtocol) throws Exception {
        ConsumerMetadata createMetadata = createMetadata(this.subscription);
        MockClient mockClient = new MockClient(this.time, (Metadata) createMetadata);
        initMetadata(mockClient, Collections.singletonMap("test", 1));
        Node node = (Node) createMetadata.fetch().nodes().get(0);
        this.consumer = newConsumer(groupProtocol, this.time, mockClient, this.subscription, createMetadata, this.assignor, true, this.groupInstanceId);
        this.consumer.subscribe(Collections.singleton("test"), getConsumerRebalanceListener(this.consumer));
        Node prepareRebalance = prepareRebalance(mockClient, node, this.assignor, Collections.singletonList(this.tp0), null);
        mockClient.prepareResponseFrom(fetchResponse(this.tp0, 0L, 0), node);
        this.consumer.updateAssignmentMetadataIfNeeded(this.time.timer(Long.MAX_VALUE));
        Assertions.assertEquals(Collections.singleton(this.tp0), this.consumer.assignment());
        AtomicBoolean prepareHeartbeatResponse = prepareHeartbeatResponse(mockClient, prepareRebalance, Errors.NONE);
        this.time.sleep(1000L);
        Thread.sleep(1000L);
        this.consumer.updateAssignmentMetadataIfNeeded(this.time.timer(Long.MAX_VALUE));
        Assertions.assertTrue(prepareHeartbeatResponse.get());
    }

    @EnumSource(value = GroupProtocol.class, names = {"CLASSIC"})
    @ParameterizedTest
    public void verifyHeartbeatSentWhenFetchedDataReady(GroupProtocol groupProtocol) throws Exception {
        ConsumerMetadata createMetadata = createMetadata(this.subscription);
        MockClient mockClient = new MockClient(this.time, (Metadata) createMetadata);
        initMetadata(mockClient, Collections.singletonMap("test", 1));
        Node node = (Node) createMetadata.fetch().nodes().get(0);
        this.consumer = newConsumer(groupProtocol, this.time, mockClient, this.subscription, createMetadata, this.assignor, true, this.groupInstanceId);
        this.consumer.subscribe(Collections.singleton("test"), getConsumerRebalanceListener(this.consumer));
        Node prepareRebalance = prepareRebalance(mockClient, node, this.assignor, Collections.singletonList(this.tp0), null);
        this.consumer.updateAssignmentMetadataIfNeeded(this.time.timer(Long.MAX_VALUE));
        this.consumer.poll(Duration.ZERO);
        mockClient.respondFrom(fetchResponse(this.tp0, 0L, 5), node);
        mockClient.poll(0L, this.time.milliseconds());
        mockClient.prepareResponseFrom(fetchResponse(this.tp0, 5L, 0), node);
        AtomicBoolean prepareHeartbeatResponse = prepareHeartbeatResponse(mockClient, prepareRebalance, Errors.NONE);
        this.time.sleep(1000L);
        Thread.sleep(1000L);
        this.consumer.poll(Duration.ZERO);
        Assertions.assertTrue(prepareHeartbeatResponse.get());
    }

    @EnumSource(value = GroupProtocol.class, names = {"CLASSIC"})
    @ParameterizedTest
    public void verifyPollTimesOutDuringMetadataUpdate(GroupProtocol groupProtocol) {
        ConsumerMetadata createMetadata = createMetadata(this.subscription);
        MockClient mockClient = new MockClient(this.time, (Metadata) createMetadata);
        initMetadata(mockClient, Collections.singletonMap("test", 1));
        Node node = (Node) createMetadata.fetch().nodes().get(0);
        this.consumer = newConsumer(groupProtocol, this.time, mockClient, this.subscription, createMetadata, this.assignor, true, this.groupInstanceId);
        this.consumer.subscribe(Collections.singleton("test"), getConsumerRebalanceListener(this.consumer));
        mockClient.prepareResponseFrom(FindCoordinatorResponse.prepareResponse(Errors.NONE, "mock-group", node), node);
        mockClient.prepareResponseFrom(joinGroupFollowerResponse(this.assignor, 1, "memberId", "leaderId", Errors.NONE), new Node(Integer.MAX_VALUE - node.id(), node.host(), node.port()));
        this.consumer.poll(Duration.ZERO);
        Assertions.assertEquals(0L, mockClient.requests().stream().filter(clientRequest -> {
            return clientRequest.apiKey().equals(ApiKeys.FETCH);
        }).count());
    }

    @EnumSource(value = GroupProtocol.class, names = {"CLASSIC"})
    @ParameterizedTest
    public void verifyDeprecatedPollDoesNotTimeOutDuringMetadataUpdate(GroupProtocol groupProtocol) {
        ConsumerMetadata createMetadata = createMetadata(this.subscription);
        MockClient mockClient = new MockClient(this.time, (Metadata) createMetadata);
        initMetadata(mockClient, Collections.singletonMap("test", 1));
        Node node = (Node) createMetadata.fetch().nodes().get(0);
        this.consumer = newConsumer(groupProtocol, this.time, mockClient, this.subscription, createMetadata, this.assignor, true, this.groupInstanceId);
        this.consumer.subscribe(Collections.singleton("test"), getConsumerRebalanceListener(this.consumer));
        prepareRebalance(mockClient, node, this.assignor, Collections.singletonList(this.tp0), null);
        this.consumer.poll(0L);
        Queue<ClientRequest> requests = mockClient.requests();
        Assertions.assertEquals(1, requests.size());
        Assertions.assertEquals(FetchRequest.Builder.class, requests.peek().requestBuilder().getClass());
    }

    @EnumSource(GroupProtocol.class)
    @ParameterizedTest
    public void verifyNoCoordinatorLookupForManualAssignmentWithSeek(GroupProtocol groupProtocol) {
        ConsumerMetadata createMetadata = createMetadata(this.subscription);
        MockClient mockClient = new MockClient(this.time, (Metadata) createMetadata);
        initMetadata(mockClient, Collections.singletonMap("test", 1));
        this.consumer = newConsumer(groupProtocol, this.time, mockClient, this.subscription, createMetadata, this.assignor, false, null, this.groupInstanceId, false);
        this.consumer.assign(Collections.singleton(this.tp0));
        this.consumer.seekToBeginning(Collections.singleton(this.tp0));
        mockClient.prepareResponse(listOffsetsResponse(Collections.singletonMap(this.tp0, 50L)));
        mockClient.prepareResponse(fetchResponse(this.tp0, 50L, 5));
        Assertions.assertEquals(5, this.consumer.poll(Duration.ofMillis(1L)).count());
        Assertions.assertEquals(55L, this.consumer.position(this.tp0));
    }

    @EnumSource(value = GroupProtocol.class, names = {"CLASSIC"})
    @ParameterizedTest
    public void verifyNoCoordinatorLookupForManualAssignmentWithOffsetCommit(GroupProtocol groupProtocol) {
        ConsumerMetadata createMetadata = createMetadata(this.subscription);
        MockClient mockClient = new MockClient(this.time, (Metadata) createMetadata);
        initMetadata(mockClient, Collections.singletonMap("test", 1));
        Node node = (Node) createMetadata.fetch().nodes().get(0);
        this.consumer = newConsumer(groupProtocol, this.time, mockClient, this.subscription, createMetadata, this.assignor, true, this.groupInstanceId);
        this.consumer.assign(Collections.singleton(this.tp0));
        mockClient.prepareResponseFrom(FindCoordinatorResponse.prepareResponse(Errors.COORDINATOR_NOT_AVAILABLE, "mock-group", node), node);
        this.consumer.poll(Duration.ofMillis(0L));
        mockClient.prepareResponseFrom(FindCoordinatorResponse.prepareResponse(Errors.NONE, "mock-group", node), node);
        mockClient.prepareResponse(offsetResponse(Collections.singletonMap(this.tp0, 50L), Errors.NONE));
        mockClient.prepareResponse(fetchResponse(this.tp0, 50L, 5));
        Assertions.assertEquals(5, this.consumer.poll(Duration.ofMillis(0L)).count());
        Assertions.assertEquals(55L, this.consumer.position(this.tp0));
        mockClient.prepareResponse(offsetCommitResponse(Collections.singletonMap(this.tp0, Errors.NONE)));
        this.consumer.commitSync(Collections.singletonMap(this.tp0, new OffsetAndMetadata(55L)));
        mockClient.prepareResponse(offsetResponse(Collections.singletonMap(this.tp0, 55L), Errors.NONE));
        Assertions.assertEquals(55L, ((OffsetAndMetadata) this.consumer.committed(Collections.singleton(this.tp0), Duration.ZERO).get(this.tp0)).offset());
    }

    @EnumSource(value = GroupProtocol.class, names = {"CLASSIC"})
    @ParameterizedTest
    public void testFetchProgressWithMissingPartitionPosition(GroupProtocol groupProtocol) {
        ConsumerMetadata createMetadata = createMetadata(this.subscription);
        MockClient mockClient = new MockClient(this.time, (Metadata) createMetadata);
        initMetadata(mockClient, Collections.singletonMap("test", 2));
        this.consumer = newConsumerNoAutoCommit(groupProtocol, this.time, mockClient, this.subscription, createMetadata);
        this.consumer.assign(Arrays.asList(this.tp0, this.tp1));
        this.consumer.seekToEnd(Collections.singleton(this.tp0));
        this.consumer.seekToBeginning(Collections.singleton(this.tp1));
        mockClient.prepareResponse(abstractRequest -> {
            List list = (List) ((ListOffsetsRequest) abstractRequest).topics().stream().flatMap(listOffsetsTopic -> {
                return listOffsetsTopic.name().equals("test") ? Stream.of(listOffsetsTopic.partitions()) : Stream.empty();
            }).flatMap((v0) -> {
                return v0.stream();
            }).collect(Collectors.toList());
            return list.contains(new ListOffsetsRequestData.ListOffsetsPartition().setPartitionIndex(this.tp0.partition()).setTimestamp(-1L)) && list.contains(new ListOffsetsRequestData.ListOffsetsPartition().setPartitionIndex(this.tp1.partition()).setTimestamp(-2L));
        }, (AbstractResponse) listOffsetsResponse(Collections.singletonMap(this.tp0, 50L), Collections.singletonMap(this.tp1, Errors.NOT_LEADER_OR_FOLLOWER)));
        mockClient.prepareResponse(abstractRequest2 -> {
            FetchRequest fetchRequest = (FetchRequest) abstractRequest2;
            Map<TopicIdPartition, FetchRequestData.FetchPartition> fetchPartitionMap = RequestTestUtils.fetchPartitionMap(fetchRequest.data(), Short.valueOf(fetchRequest.version()), this.topicNames);
            TopicIdPartition topicIdPartition = new TopicIdPartition(this.topicIds.get(this.tp0.topic()), this.tp0);
            return fetchPartitionMap.keySet().equals(Collections.singleton(topicIdPartition)) && fetchPartitionMap.get(topicIdPartition).fetchOffset() == 50;
        }, (AbstractResponse) fetchResponse(this.tp0, 50L, 5));
        ConsumerRecords poll = this.consumer.poll(Duration.ofMillis(1L));
        Assertions.assertEquals(5, poll.count());
        Assertions.assertEquals(Collections.singleton(this.tp0), poll.partitions());
    }

    private void initMetadata(MockClient mockClient, Map<String, Integer> map) {
        HashMap hashMap = new HashMap();
        for (String str : map.keySet()) {
            hashMap.put(str, this.topicIds.get(str));
        }
        mockClient.updateMetadata(RequestTestUtils.metadataUpdateWithIds(1, map, hashMap));
    }

    @EnumSource(value = GroupProtocol.class, names = {"CLASSIC"})
    @ParameterizedTest
    public void testMissingOffsetNoResetPolicy(GroupProtocol groupProtocol) {
        SubscriptionState subscriptionState = new SubscriptionState(new LogContext(), OffsetResetStrategy.NONE);
        ConsumerMetadata createMetadata = createMetadata(subscriptionState);
        MockClient mockClient = new MockClient(this.time, (Metadata) createMetadata);
        initMetadata(mockClient, Collections.singletonMap("test", 1));
        Node node = (Node) createMetadata.fetch().nodes().get(0);
        this.consumer = newConsumer(groupProtocol, this.time, mockClient, subscriptionState, createMetadata, this.assignor, true, "mock-group", this.groupInstanceId, false);
        this.consumer.assign(Collections.singletonList(this.tp0));
        mockClient.prepareResponseFrom(FindCoordinatorResponse.prepareResponse(Errors.NONE, "mock-group", node), node);
        mockClient.prepareResponseFrom(offsetResponse(Collections.singletonMap(this.tp0, -1L), Errors.NONE), new Node(Integer.MAX_VALUE - node.id(), node.host(), node.port()));
        Assertions.assertThrows(NoOffsetForPartitionException.class, () -> {
            this.consumer.poll(Duration.ZERO);
        });
    }

    @EnumSource(value = GroupProtocol.class, names = {"CLASSIC"})
    @ParameterizedTest
    public void testResetToCommittedOffset(GroupProtocol groupProtocol) {
        SubscriptionState subscriptionState = new SubscriptionState(new LogContext(), OffsetResetStrategy.NONE);
        ConsumerMetadata createMetadata = createMetadata(subscriptionState);
        MockClient mockClient = new MockClient(this.time, (Metadata) createMetadata);
        initMetadata(mockClient, Collections.singletonMap("test", 1));
        Node node = (Node) createMetadata.fetch().nodes().get(0);
        KafkaConsumer<String, String> newConsumer = newConsumer(groupProtocol, this.time, mockClient, subscriptionState, createMetadata, this.assignor, true, "mock-group", this.groupInstanceId, false);
        newConsumer.assign(Collections.singletonList(this.tp0));
        mockClient.prepareResponseFrom(FindCoordinatorResponse.prepareResponse(Errors.NONE, "mock-group", node), node);
        mockClient.prepareResponseFrom(offsetResponse(Collections.singletonMap(this.tp0, 539L), Errors.NONE), new Node(Integer.MAX_VALUE - node.id(), node.host(), node.port()));
        newConsumer.poll(Duration.ZERO);
        Assertions.assertEquals(539L, newConsumer.position(this.tp0));
    }

    @EnumSource(value = GroupProtocol.class, names = {"CLASSIC"})
    @ParameterizedTest
    public void testResetUsingAutoResetPolicy(GroupProtocol groupProtocol) {
        SubscriptionState subscriptionState = new SubscriptionState(new LogContext(), OffsetResetStrategy.LATEST);
        ConsumerMetadata createMetadata = createMetadata(subscriptionState);
        MockClient mockClient = new MockClient(this.time, (Metadata) createMetadata);
        initMetadata(mockClient, Collections.singletonMap("test", 1));
        Node node = (Node) createMetadata.fetch().nodes().get(0);
        this.consumer = newConsumer(groupProtocol, this.time, mockClient, subscriptionState, createMetadata, this.assignor, true, "mock-group", this.groupInstanceId, false);
        this.consumer.assign(Collections.singletonList(this.tp0));
        mockClient.prepareResponseFrom(FindCoordinatorResponse.prepareResponse(Errors.NONE, "mock-group", node), node);
        mockClient.prepareResponseFrom(offsetResponse(Collections.singletonMap(this.tp0, -1L), Errors.NONE), new Node(Integer.MAX_VALUE - node.id(), node.host(), node.port()));
        mockClient.prepareResponse(listOffsetsResponse(Collections.singletonMap(this.tp0, 50L)));
        this.consumer.poll(Duration.ZERO);
        Assertions.assertEquals(50L, this.consumer.position(this.tp0));
    }

    @EnumSource(GroupProtocol.class)
    @ParameterizedTest
    public void testOffsetIsValidAfterSeek(GroupProtocol groupProtocol) {
        SubscriptionState subscriptionState = new SubscriptionState(new LogContext(), OffsetResetStrategy.LATEST);
        ConsumerMetadata createMetadata = createMetadata(subscriptionState);
        MockClient mockClient = new MockClient(this.time, (Metadata) createMetadata);
        initMetadata(mockClient, Collections.singletonMap("test", 1));
        this.consumer = newConsumer(groupProtocol, this.time, mockClient, subscriptionState, createMetadata, this.assignor, true, "mock-group", Optional.empty(), false);
        this.consumer.assign(Collections.singletonList(this.tp0));
        this.consumer.seek(this.tp0, 20L);
        this.consumer.poll(Duration.ZERO);
        Assertions.assertEquals(subscriptionState.validPosition(this.tp0).offset, 20L);
    }

    @EnumSource(value = GroupProtocol.class, names = {"CLASSIC"})
    @ParameterizedTest
    public void testCommitsFetchedDuringAssign(GroupProtocol groupProtocol) {
        ConsumerMetadata createMetadata = createMetadata(this.subscription);
        MockClient mockClient = new MockClient(this.time, (Metadata) createMetadata);
        initMetadata(mockClient, Collections.singletonMap("test", 2));
        Node node = (Node) createMetadata.fetch().nodes().get(0);
        this.consumer = newConsumer(groupProtocol, this.time, mockClient, this.subscription, createMetadata, this.assignor, true, this.groupInstanceId);
        this.consumer.assign(Collections.singletonList(this.tp0));
        mockClient.prepareResponseFrom(FindCoordinatorResponse.prepareResponse(Errors.NONE, "mock-group", node), node);
        Node node2 = new Node(Integer.MAX_VALUE - node.id(), node.host(), node.port());
        mockClient.prepareResponseFrom(offsetResponse(Collections.singletonMap(this.tp0, 10000L), Errors.NONE), node2);
        Assertions.assertEquals(10000L, ((OffsetAndMetadata) this.consumer.committed(Collections.singleton(this.tp0)).get(this.tp0)).offset());
        this.consumer.assign(Arrays.asList(this.tp0, this.tp1));
        HashMap hashMap = new HashMap();
        hashMap.put(this.tp0, 10000L);
        mockClient.prepareResponseFrom(offsetResponse(hashMap, Errors.NONE), node2);
        Assertions.assertEquals(10000L, ((OffsetAndMetadata) this.consumer.committed(Collections.singleton(this.tp0)).get(this.tp0)).offset());
        hashMap.remove(this.tp0);
        hashMap.put(this.tp1, 20000L);
        mockClient.prepareResponseFrom(offsetResponse(hashMap, Errors.NONE), node2);
        Assertions.assertEquals(20000L, ((OffsetAndMetadata) this.consumer.committed(Collections.singleton(this.tp1)).get(this.tp1)).offset());
    }

    @EnumSource(value = GroupProtocol.class, names = {"CLASSIC"})
    @ParameterizedTest
    public void testFetchStableOffsetThrowInCommitted(GroupProtocol groupProtocol) {
        Assertions.assertThrows(UnsupportedVersionException.class, () -> {
            setupThrowableConsumer(groupProtocol).committed(Collections.singleton(this.tp0));
        });
    }

    @EnumSource(value = GroupProtocol.class, names = {"CLASSIC"})
    @ParameterizedTest
    public void testFetchStableOffsetThrowInPoll(GroupProtocol groupProtocol) {
        Assertions.assertThrows(UnsupportedVersionException.class, () -> {
            setupThrowableConsumer(groupProtocol).poll(Duration.ZERO);
        });
    }

    @EnumSource(value = GroupProtocol.class, names = {"CLASSIC"})
    @ParameterizedTest
    public void testFetchStableOffsetThrowInPosition(GroupProtocol groupProtocol) {
        Assertions.assertThrows(UnsupportedVersionException.class, () -> {
            setupThrowableConsumer(groupProtocol).position(this.tp0);
        });
    }

    private KafkaConsumer<?, ?> setupThrowableConsumer(GroupProtocol groupProtocol) {
        ConsumerMetadata createMetadata = createMetadata(this.subscription);
        MockClient mockClient = new MockClient(this.time, (Metadata) createMetadata);
        initMetadata(mockClient, Collections.singletonMap("test", 2));
        mockClient.setNodeApiVersions(NodeApiVersions.create(ApiKeys.OFFSET_FETCH.id, (short) 0, (short) 6));
        Node node = (Node) createMetadata.fetch().nodes().get(0);
        this.consumer = newConsumer(groupProtocol, this.time, mockClient, this.subscription, createMetadata, this.assignor, true, "mock-group", this.groupInstanceId, true);
        this.consumer.assign(Collections.singletonList(this.tp0));
        mockClient.prepareResponseFrom(FindCoordinatorResponse.prepareResponse(Errors.NONE, "mock-group", node), node);
        mockClient.prepareResponseFrom(offsetResponse(Collections.singletonMap(this.tp0, 10000L), Errors.NONE), new Node(Integer.MAX_VALUE - node.id(), node.host(), node.port()));
        return this.consumer;
    }

    @EnumSource(value = GroupProtocol.class, names = {"CLASSIC"})
    @ParameterizedTest
    public void testNoCommittedOffsets(GroupProtocol groupProtocol) {
        ConsumerMetadata createMetadata = createMetadata(this.subscription);
        MockClient mockClient = new MockClient(this.time, (Metadata) createMetadata);
        initMetadata(mockClient, Collections.singletonMap("test", 2));
        Node node = (Node) createMetadata.fetch().nodes().get(0);
        this.consumer = newConsumer(groupProtocol, this.time, mockClient, this.subscription, createMetadata, this.assignor, true, this.groupInstanceId);
        this.consumer.assign(Arrays.asList(this.tp0, this.tp1));
        mockClient.prepareResponseFrom(FindCoordinatorResponse.prepareResponse(Errors.NONE, "mock-group", node), node);
        mockClient.prepareResponseFrom(offsetResponse(Utils.mkMap(new Map.Entry[]{Utils.mkEntry(this.tp0, 10000L), Utils.mkEntry(this.tp1, -1L)}), Errors.NONE), new Node(Integer.MAX_VALUE - node.id(), node.host(), node.port()));
        Map committed = this.consumer.committed(Utils.mkSet(new TopicPartition[]{this.tp0, this.tp1}));
        Assertions.assertEquals(2, committed.size());
        Assertions.assertEquals(10000L, ((OffsetAndMetadata) committed.get(this.tp0)).offset());
        Assertions.assertNull(committed.get(this.tp1));
    }

    @EnumSource(value = GroupProtocol.class, names = {"CLASSIC"})
    @ParameterizedTest
    public void testAutoCommitSentBeforePositionUpdate(GroupProtocol groupProtocol) {
        ConsumerMetadata createMetadata = createMetadata(this.subscription);
        MockClient mockClient = new MockClient(this.time, (Metadata) createMetadata);
        initMetadata(mockClient, Collections.singletonMap("test", 1));
        Node node = (Node) createMetadata.fetch().nodes().get(0);
        this.consumer = newConsumer(groupProtocol, this.time, mockClient, this.subscription, createMetadata, this.assignor, true, this.groupInstanceId);
        this.consumer.subscribe(Collections.singleton("test"), getConsumerRebalanceListener(this.consumer));
        Node prepareRebalance = prepareRebalance(mockClient, node, this.assignor, Collections.singletonList(this.tp0), null);
        this.consumer.updateAssignmentMetadataIfNeeded(this.time.timer(Long.MAX_VALUE));
        this.consumer.poll(Duration.ZERO);
        mockClient.respondFrom(fetchResponse(this.tp0, 0L, 5), node);
        mockClient.poll(0L, this.time.milliseconds());
        this.time.sleep(500L);
        mockClient.prepareResponseFrom(fetchResponse(this.tp0, 5L, 0), node);
        AtomicBoolean prepareOffsetCommitResponse = prepareOffsetCommitResponse(mockClient, prepareRebalance, this.tp0, 0L);
        this.consumer.poll(Duration.ZERO);
        Assertions.assertTrue(prepareOffsetCommitResponse.get());
    }

    @EnumSource(value = GroupProtocol.class, names = {"CLASSIC"})
    @ParameterizedTest
    public void testRegexSubscription(GroupProtocol groupProtocol) {
        ConsumerMetadata createMetadata = createMetadata(this.subscription);
        MockClient mockClient = new MockClient(this.time, (Metadata) createMetadata);
        HashMap hashMap = new HashMap();
        hashMap.put("test", 1);
        hashMap.put("unmatched", 1);
        this.topicIds.put("unmatched", Uuid.randomUuid());
        initMetadata(mockClient, hashMap);
        Node node = (Node) createMetadata.fetch().nodes().get(0);
        this.consumer = newConsumer(groupProtocol, this.time, mockClient, this.subscription, createMetadata, this.assignor, true, this.groupInstanceId);
        prepareRebalance(mockClient, node, Collections.singleton("test"), this.assignor, Collections.singletonList(this.tp0), null);
        this.consumer.subscribe(Pattern.compile("test"), getConsumerRebalanceListener(this.consumer));
        mockClient.prepareMetadataUpdate(RequestTestUtils.metadataUpdateWithIds(1, hashMap, this.topicIds));
        this.consumer.updateAssignmentMetadataIfNeeded(this.time.timer(Long.MAX_VALUE));
        Assertions.assertEquals(Collections.singleton("test"), this.consumer.subscription());
        Assertions.assertEquals(Collections.singleton(this.tp0), this.consumer.assignment());
    }

    @EnumSource(value = GroupProtocol.class, names = {"CLASSIC"})
    @ParameterizedTest
    public void testChangingRegexSubscription(GroupProtocol groupProtocol) {
        TopicPartition topicPartition = new TopicPartition("other", 0);
        ConsumerMetadata createMetadata = createMetadata(this.subscription);
        MockClient mockClient = new MockClient(this.time, (Metadata) createMetadata);
        HashMap hashMap = new HashMap();
        hashMap.put("test", 1);
        hashMap.put("other", 1);
        this.topicIds.put("other", Uuid.randomUuid());
        initMetadata(mockClient, hashMap);
        Node node = (Node) createMetadata.fetch().nodes().get(0);
        this.consumer = newConsumer(groupProtocol, this.time, mockClient, this.subscription, createMetadata, this.assignor, false, this.groupInstanceId);
        Node prepareRebalance = prepareRebalance(mockClient, node, Collections.singleton("test"), this.assignor, Collections.singletonList(this.tp0), null);
        this.consumer.subscribe(Pattern.compile("test"), getConsumerRebalanceListener(this.consumer));
        this.consumer.updateAssignmentMetadataIfNeeded(this.time.timer(Long.MAX_VALUE));
        this.consumer.poll(Duration.ZERO);
        Assertions.assertEquals(Collections.singleton("test"), this.consumer.subscription());
        this.consumer.subscribe(Pattern.compile("other"), getConsumerRebalanceListener(this.consumer));
        mockClient.prepareMetadataUpdate(RequestTestUtils.metadataUpdateWithIds(1, hashMap, this.topicIds));
        prepareRebalance(mockClient, node, Collections.singleton("other"), this.assignor, Collections.singletonList(topicPartition), prepareRebalance);
        this.consumer.poll(Duration.ZERO);
        Assertions.assertEquals(Collections.singleton("other"), this.consumer.subscription());
    }

    @EnumSource(value = GroupProtocol.class, names = {"CLASSIC"})
    @ParameterizedTest
    public void testWakeupWithFetchDataAvailable(GroupProtocol groupProtocol) throws Exception {
        ConsumerMetadata createMetadata = createMetadata(this.subscription);
        MockClient mockClient = new MockClient(this.time, (Metadata) createMetadata);
        initMetadata(mockClient, Collections.singletonMap("test", 1));
        Node node = (Node) createMetadata.fetch().nodes().get(0);
        this.consumer = newConsumer(groupProtocol, this.time, mockClient, this.subscription, createMetadata, this.assignor, true, this.groupInstanceId);
        this.consumer.subscribe(Collections.singleton("test"), getConsumerRebalanceListener(this.consumer));
        prepareRebalance(mockClient, node, this.assignor, Collections.singletonList(this.tp0), null);
        this.consumer.updateAssignmentMetadataIfNeeded(this.time.timer(Long.MAX_VALUE));
        this.consumer.poll(Duration.ZERO);
        mockClient.respondFrom(fetchResponse(this.tp0, 0L, 5), node);
        mockClient.poll(0L, this.time.milliseconds());
        this.consumer.wakeup();
        Assertions.assertThrows(WakeupException.class, () -> {
            this.consumer.poll(Duration.ZERO);
        });
        Assertions.assertEquals(0L, this.consumer.position(this.tp0));
        Assertions.assertEquals(5, this.consumer.poll(Duration.ZERO).count());
        ScheduledExecutorService newSingleThreadScheduledExecutor = Executors.newSingleThreadScheduledExecutor();
        newSingleThreadScheduledExecutor.scheduleAtFixedRate(() -> {
            this.time.sleep(10000L);
        }, 0L, 10L, TimeUnit.MILLISECONDS);
        this.consumer.close();
        newSingleThreadScheduledExecutor.shutdownNow();
        newSingleThreadScheduledExecutor.awaitTermination(5L, TimeUnit.SECONDS);
    }

    @EnumSource(value = GroupProtocol.class, names = {"CLASSIC"})
    @ParameterizedTest
    public void testPollThrowsInterruptExceptionIfInterrupted(GroupProtocol groupProtocol) {
        ConsumerMetadata createMetadata = createMetadata(this.subscription);
        MockClient mockClient = new MockClient(this.time, (Metadata) createMetadata);
        initMetadata(mockClient, Collections.singletonMap("test", 1));
        Node node = (Node) createMetadata.fetch().nodes().get(0);
        this.consumer = newConsumer(groupProtocol, this.time, mockClient, this.subscription, createMetadata, this.assignor, false, this.groupInstanceId);
        this.consumer.subscribe(Collections.singleton("test"), getConsumerRebalanceListener(this.consumer));
        prepareRebalance(mockClient, node, this.assignor, Collections.singletonList(this.tp0), null);
        this.consumer.updateAssignmentMetadataIfNeeded(this.time.timer(Long.MAX_VALUE));
        this.consumer.poll(Duration.ZERO);
        try {
            Thread.currentThread().interrupt();
            Assertions.assertThrows(InterruptException.class, () -> {
                this.consumer.poll(Duration.ZERO);
            });
        } finally {
            Thread.interrupted();
        }
    }

    @EnumSource(value = GroupProtocol.class, names = {"CLASSIC"})
    @ParameterizedTest
    public void fetchResponseWithUnexpectedPartitionIsIgnored(GroupProtocol groupProtocol) {
        ConsumerMetadata createMetadata = createMetadata(this.subscription);
        MockClient mockClient = new MockClient(this.time, (Metadata) createMetadata);
        initMetadata(mockClient, Collections.singletonMap("test", 1));
        Node node = (Node) createMetadata.fetch().nodes().get(0);
        this.consumer = newConsumer(groupProtocol, this.time, mockClient, this.subscription, createMetadata, this.assignor, true, this.groupInstanceId);
        this.consumer.subscribe(Collections.singletonList("test"), getConsumerRebalanceListener(this.consumer));
        prepareRebalance(mockClient, node, this.assignor, Collections.singletonList(this.tp0), null);
        HashMap hashMap = new HashMap();
        hashMap.put(this.tp0, new FetchInfo(0L, 1));
        hashMap.put(this.t2p0, new FetchInfo(0L, 10));
        mockClient.prepareResponseFrom(fetchResponse(hashMap), node);
        this.consumer.updateAssignmentMetadataIfNeeded(this.time.timer(Long.MAX_VALUE));
        Assertions.assertEquals(0, this.consumer.poll(Duration.ZERO).count());
    }

    @EnumSource(value = GroupProtocol.class, names = {"CLASSIC"})
    @ParameterizedTest
    public void testSubscriptionChangesWithAutoCommitEnabled(GroupProtocol groupProtocol) {
        ConsumerMetadata createMetadata = createMetadata(this.subscription);
        MockClient mockClient = new MockClient(this.time, (Metadata) createMetadata);
        HashMap hashMap = new HashMap();
        hashMap.put("test", 1);
        hashMap.put("test2", 1);
        hashMap.put("test3", 1);
        initMetadata(mockClient, hashMap);
        Node node = (Node) createMetadata.fetch().nodes().get(0);
        RangeAssignor rangeAssignor = new RangeAssignor();
        this.consumer = newConsumer(groupProtocol, this.time, mockClient, this.subscription, createMetadata, rangeAssignor, true, this.groupInstanceId);
        this.consumer.subscribe(Arrays.asList("test", "test2"), getConsumerRebalanceListener(this.consumer));
        Assertions.assertEquals(2, this.consumer.subscription().size());
        Assertions.assertTrue(this.consumer.subscription().contains("test") && this.consumer.subscription().contains("test2"));
        Assertions.assertTrue(this.consumer.assignment().isEmpty());
        Node prepareRebalance = prepareRebalance(mockClient, node, rangeAssignor, Arrays.asList(this.tp0, this.t2p0), null);
        this.consumer.updateAssignmentMetadataIfNeeded(this.time.timer(Long.MAX_VALUE));
        this.consumer.poll(Duration.ZERO);
        Assertions.assertEquals(2, this.consumer.subscription().size());
        Assertions.assertTrue(this.consumer.subscription().contains("test") && this.consumer.subscription().contains("test2"));
        Assertions.assertEquals(2, this.consumer.assignment().size());
        Assertions.assertTrue(this.consumer.assignment().contains(this.tp0) && this.consumer.assignment().contains(this.t2p0));
        HashMap hashMap2 = new HashMap();
        hashMap2.put(this.tp0, new FetchInfo(0L, 1));
        hashMap2.put(this.t2p0, new FetchInfo(0L, 10));
        mockClient.respondFrom(fetchResponse(hashMap2), node);
        mockClient.poll(0L, this.time.milliseconds());
        ConsumerRecords poll = this.consumer.poll(Duration.ofMillis(1L));
        hashMap2.put(this.tp0, new FetchInfo(1L, 0));
        hashMap2.put(this.t2p0, new FetchInfo(10L, 0));
        mockClient.respondFrom(fetchResponse(hashMap2), node);
        mockClient.poll(0L, this.time.milliseconds());
        Assertions.assertEquals(11, poll.count());
        Assertions.assertEquals(1L, this.consumer.position(this.tp0));
        Assertions.assertEquals(10L, this.consumer.position(this.t2p0));
        this.consumer.subscribe(Arrays.asList("test", "test3"), getConsumerRebalanceListener(this.consumer));
        Assertions.assertEquals(2, this.consumer.subscription().size());
        Assertions.assertTrue(this.consumer.subscription().contains("test") && this.consumer.subscription().contains("test3"));
        Assertions.assertEquals(2, this.consumer.assignment().size());
        Assertions.assertTrue(this.consumer.assignment().contains(this.tp0) && this.consumer.assignment().contains(this.t2p0));
        HashMap hashMap3 = new HashMap();
        hashMap3.put(this.tp0, 1L);
        hashMap3.put(this.t2p0, 10L);
        AtomicBoolean prepareOffsetCommitResponse = prepareOffsetCommitResponse(mockClient, prepareRebalance, hashMap3);
        prepareRebalance(mockClient, node, rangeAssignor, Arrays.asList(this.tp0, this.t3p0), prepareRebalance);
        HashMap hashMap4 = new HashMap();
        hashMap4.put(this.tp0, new FetchInfo(1L, 1));
        hashMap4.put(this.t3p0, new FetchInfo(0L, 100));
        mockClient.prepareResponse(fetchResponse(hashMap4));
        Assertions.assertEquals(101, this.consumer.poll(Duration.ofMillis(1L)).count());
        Assertions.assertEquals(2L, this.consumer.position(this.tp0));
        Assertions.assertEquals(100L, this.consumer.position(this.t3p0));
        Assertions.assertTrue(prepareOffsetCommitResponse.get());
        Assertions.assertEquals(2, this.consumer.subscription().size());
        Assertions.assertTrue(this.consumer.subscription().contains("test") && this.consumer.subscription().contains("test3"));
        Assertions.assertEquals(2, this.consumer.assignment().size());
        Assertions.assertTrue(this.consumer.assignment().contains(this.tp0) && this.consumer.assignment().contains(this.t3p0));
        this.consumer.unsubscribe();
        Assertions.assertTrue(this.consumer.subscription().isEmpty());
        Assertions.assertTrue(this.consumer.assignment().isEmpty());
        mockClient.requests().clear();
    }

    @EnumSource(value = GroupProtocol.class, names = {"CLASSIC"})
    @ParameterizedTest
    public void testSubscriptionChangesWithAutoCommitDisabled(GroupProtocol groupProtocol) {
        ConsumerMetadata createMetadata = createMetadata(this.subscription);
        MockClient mockClient = new MockClient(this.time, (Metadata) createMetadata);
        HashMap hashMap = new HashMap();
        hashMap.put("test", 1);
        hashMap.put("test2", 1);
        initMetadata(mockClient, hashMap);
        Node node = (Node) createMetadata.fetch().nodes().get(0);
        RangeAssignor rangeAssignor = new RangeAssignor();
        this.consumer = newConsumer(groupProtocol, this.time, mockClient, this.subscription, createMetadata, rangeAssignor, false, this.groupInstanceId);
        initializeSubscriptionWithSingleTopic(this.consumer, getConsumerRebalanceListener(this.consumer));
        prepareRebalance(mockClient, node, rangeAssignor, Collections.singletonList(this.tp0), null);
        this.consumer.updateAssignmentMetadataIfNeeded(this.time.timer(Long.MAX_VALUE));
        this.consumer.poll(Duration.ZERO);
        Assertions.assertEquals(Collections.singleton("test"), this.consumer.subscription());
        Assertions.assertEquals(Collections.singleton(this.tp0), this.consumer.assignment());
        this.consumer.poll(Duration.ZERO);
        this.consumer.subscribe(Collections.singleton("test2"), getConsumerRebalanceListener(this.consumer));
        Assertions.assertEquals(Collections.singleton("test2"), this.consumer.subscription());
        Assertions.assertEquals(Collections.singleton(this.tp0), this.consumer.assignment());
        Iterator<ClientRequest> it = mockClient.requests().iterator();
        while (it.hasNext()) {
            Assertions.assertNotSame(ApiKeys.OFFSET_COMMIT, it.next().requestBuilder().apiKey());
        }
        this.consumer.unsubscribe();
        Assertions.assertEquals(Collections.emptySet(), this.consumer.subscription());
        Assertions.assertEquals(Collections.emptySet(), this.consumer.assignment());
        Iterator<ClientRequest> it2 = mockClient.requests().iterator();
        while (it2.hasNext()) {
            Assertions.assertNotSame(ApiKeys.OFFSET_COMMIT, it2.next().requestBuilder().apiKey());
        }
        mockClient.requests().clear();
    }

    @EnumSource(value = GroupProtocol.class, names = {"CLASSIC"})
    @ParameterizedTest
    public void testUnsubscribeShouldTriggerPartitionsRevokedWithValidGeneration(GroupProtocol groupProtocol) {
        ConsumerMetadata createMetadata = createMetadata(this.subscription);
        MockClient mockClient = new MockClient(this.time, (Metadata) createMetadata);
        initMetadata(mockClient, Collections.singletonMap("test", 1));
        Node node = (Node) createMetadata.fetch().nodes().get(0);
        CooperativeStickyAssignor cooperativeStickyAssignor = new CooperativeStickyAssignor();
        this.consumer = newConsumer(groupProtocol, this.time, mockClient, this.subscription, createMetadata, cooperativeStickyAssignor, false, this.groupInstanceId);
        initializeSubscriptionWithSingleTopic(this.consumer, getExceptionConsumerRebalanceListener());
        prepareRebalance(mockClient, node, cooperativeStickyAssignor, Collections.singletonList(this.tp0), null);
        Assertions.assertEquals("Hit partition assign " + this.singleTopicPartition, ((RuntimeException) Assertions.assertThrows(RuntimeException.class, () -> {
            this.consumer.updateAssignmentMetadataIfNeeded(this.time.timer(Long.MAX_VALUE));
        })).getCause().getMessage());
        KafkaConsumer<?, ?> kafkaConsumer = this.consumer;
        kafkaConsumer.getClass();
        Assertions.assertEquals("Hit partition revoke " + this.singleTopicPartition, ((RuntimeException) Assertions.assertThrows(RuntimeException.class, kafkaConsumer::unsubscribe)).getCause().getMessage());
    }

    @EnumSource(value = GroupProtocol.class, names = {"CLASSIC"})
    @ParameterizedTest
    public void testUnsubscribeShouldTriggerPartitionsLostWithNoGeneration(GroupProtocol groupProtocol) throws Exception {
        ConsumerMetadata createMetadata = createMetadata(this.subscription);
        MockClient mockClient = new MockClient(this.time, (Metadata) createMetadata);
        initMetadata(mockClient, Collections.singletonMap("test", 1));
        Node node = (Node) createMetadata.fetch().nodes().get(0);
        CooperativeStickyAssignor cooperativeStickyAssignor = new CooperativeStickyAssignor();
        this.consumer = newConsumer(groupProtocol, this.time, mockClient, this.subscription, createMetadata, cooperativeStickyAssignor, false, this.groupInstanceId);
        initializeSubscriptionWithSingleTopic(this.consumer, getExceptionConsumerRebalanceListener());
        Node prepareRebalance = prepareRebalance(mockClient, node, cooperativeStickyAssignor, Collections.singletonList(this.tp0), null);
        Assertions.assertEquals("Hit partition assign " + this.singleTopicPartition, ((RuntimeException) Assertions.assertThrows(RuntimeException.class, () -> {
            this.consumer.updateAssignmentMetadataIfNeeded(this.time.timer(Long.MAX_VALUE));
        })).getCause().getMessage());
        AtomicBoolean prepareHeartbeatResponse = prepareHeartbeatResponse(mockClient, prepareRebalance, Errors.UNKNOWN_MEMBER_ID);
        this.time.sleep(1000L);
        prepareHeartbeatResponse.getClass();
        TestUtils.waitForCondition(prepareHeartbeatResponse::get, "Heartbeat response did not occur within timeout.");
        KafkaConsumer<?, ?> kafkaConsumer = this.consumer;
        kafkaConsumer.getClass();
        Assertions.assertEquals("Hit partition lost " + this.singleTopicPartition, ((RuntimeException) Assertions.assertThrows(RuntimeException.class, kafkaConsumer::unsubscribe)).getCause().getMessage());
    }

    private void initializeSubscriptionWithSingleTopic(KafkaConsumer<?, ?> kafkaConsumer, ConsumerRebalanceListener consumerRebalanceListener) {
        kafkaConsumer.subscribe(Collections.singleton("test"), consumerRebalanceListener);
        Assertions.assertEquals(Collections.singleton("test"), kafkaConsumer.subscription());
        Assertions.assertEquals(Collections.emptySet(), kafkaConsumer.assignment());
    }

    @EnumSource(value = GroupProtocol.class, names = {"CLASSIC"})
    @ParameterizedTest
    public void testManualAssignmentChangeWithAutoCommitEnabled(GroupProtocol groupProtocol) {
        ConsumerMetadata createMetadata = createMetadata(this.subscription);
        MockClient mockClient = new MockClient(this.time, (Metadata) createMetadata);
        HashMap hashMap = new HashMap();
        hashMap.put("test", 1);
        hashMap.put("test2", 1);
        initMetadata(mockClient, hashMap);
        Node node = (Node) createMetadata.fetch().nodes().get(0);
        this.consumer = newConsumer(groupProtocol, this.time, mockClient, this.subscription, createMetadata, new RangeAssignor(), true, this.groupInstanceId);
        mockClient.prepareResponseFrom(FindCoordinatorResponse.prepareResponse(Errors.NONE, "mock-group", node), node);
        Node node2 = new Node(Integer.MAX_VALUE - node.id(), node.host(), node.port());
        this.consumer.assign(Collections.singleton(this.tp0));
        this.consumer.seekToBeginning(Collections.singleton(this.tp0));
        mockClient.prepareResponseFrom(offsetResponse(Collections.singletonMap(this.tp0, 0L), Errors.NONE), node2);
        Assertions.assertEquals(0L, ((OffsetAndMetadata) this.consumer.committed(Collections.singleton(this.tp0)).get(this.tp0)).offset());
        Assertions.assertEquals(this.consumer.assignment(), Collections.singleton(this.tp0));
        mockClient.prepareResponse(listOffsetsResponse(Collections.singletonMap(this.tp0, 10L)));
        mockClient.prepareResponse(fetchResponse(this.tp0, 10L, 1));
        Assertions.assertEquals(1, this.consumer.poll(Duration.ofMillis(100L)).count());
        Assertions.assertEquals(11L, this.consumer.position(this.tp0));
        AtomicBoolean prepareOffsetCommitResponse = prepareOffsetCommitResponse(mockClient, node2, this.tp0, 11L);
        this.consumer.assign(Collections.singleton(this.t2p0));
        Assertions.assertEquals(this.consumer.assignment(), Collections.singleton(this.t2p0));
        Assertions.assertTrue(prepareOffsetCommitResponse.get());
        mockClient.requests().clear();
    }

    @EnumSource(value = GroupProtocol.class, names = {"CLASSIC"})
    @ParameterizedTest
    public void testManualAssignmentChangeWithAutoCommitDisabled(GroupProtocol groupProtocol) {
        ConsumerMetadata createMetadata = createMetadata(this.subscription);
        MockClient mockClient = new MockClient(this.time, (Metadata) createMetadata);
        HashMap hashMap = new HashMap();
        hashMap.put("test", 1);
        hashMap.put("test2", 1);
        initMetadata(mockClient, hashMap);
        Node node = (Node) createMetadata.fetch().nodes().get(0);
        this.consumer = newConsumer(groupProtocol, this.time, mockClient, this.subscription, createMetadata, new RangeAssignor(), false, this.groupInstanceId);
        mockClient.prepareResponseFrom(FindCoordinatorResponse.prepareResponse(Errors.NONE, "mock-group", node), node);
        Node node2 = new Node(Integer.MAX_VALUE - node.id(), node.host(), node.port());
        this.consumer.assign(Collections.singleton(this.tp0));
        this.consumer.seekToBeginning(Collections.singleton(this.tp0));
        mockClient.prepareResponseFrom(offsetResponse(Collections.singletonMap(this.tp0, 0L), Errors.NONE), node2);
        Assertions.assertEquals(0L, ((OffsetAndMetadata) this.consumer.committed(Collections.singleton(this.tp0)).get(this.tp0)).offset());
        Assertions.assertEquals(this.consumer.assignment(), Collections.singleton(this.tp0));
        mockClient.prepareResponse(listOffsetsResponse(Collections.singletonMap(this.tp0, 10L)));
        mockClient.prepareResponse(fetchResponse(this.tp0, 10L, 1));
        Assertions.assertEquals(1, this.consumer.poll(Duration.ofMillis(1L)).count());
        Assertions.assertEquals(11L, this.consumer.position(this.tp0));
        this.consumer.assign(Collections.singleton(this.t2p0));
        Assertions.assertEquals(this.consumer.assignment(), Collections.singleton(this.t2p0));
        Iterator<ClientRequest> it = mockClient.requests().iterator();
        while (it.hasNext()) {
            Assertions.assertNotSame(it.next().requestBuilder().apiKey(), ApiKeys.OFFSET_COMMIT);
        }
        mockClient.requests().clear();
    }

    @EnumSource(value = GroupProtocol.class, names = {"CLASSIC"})
    @ParameterizedTest
    public void testOffsetOfPausedPartitions(GroupProtocol groupProtocol) {
        ConsumerMetadata createMetadata = createMetadata(this.subscription);
        MockClient mockClient = new MockClient(this.time, (Metadata) createMetadata);
        initMetadata(mockClient, Collections.singletonMap("test", 2));
        Node node = (Node) createMetadata.fetch().nodes().get(0);
        this.consumer = newConsumer(groupProtocol, this.time, mockClient, this.subscription, createMetadata, new RangeAssignor(), true, this.groupInstanceId);
        mockClient.prepareResponseFrom(FindCoordinatorResponse.prepareResponse(Errors.NONE, "mock-group", node), node);
        Node node2 = new Node(Integer.MAX_VALUE - node.id(), node.host(), node.port());
        Set mkSet = Utils.mkSet(new TopicPartition[]{this.tp0, this.tp1});
        this.consumer.assign(mkSet);
        Assertions.assertEquals(mkSet, this.consumer.assignment());
        this.consumer.pause(mkSet);
        this.consumer.seekToEnd(mkSet);
        HashMap hashMap = new HashMap();
        hashMap.put(this.tp0, 0L);
        hashMap.put(this.tp1, 0L);
        mockClient.prepareResponseFrom(offsetResponse(hashMap, Errors.NONE), node2);
        Assertions.assertEquals(0L, ((OffsetAndMetadata) this.consumer.committed(Collections.singleton(this.tp0)).get(this.tp0)).offset());
        hashMap.remove(this.tp0);
        hashMap.put(this.tp1, 0L);
        mockClient.prepareResponseFrom(offsetResponse(hashMap, Errors.NONE), node2);
        Assertions.assertEquals(0L, ((OffsetAndMetadata) this.consumer.committed(Collections.singleton(this.tp1)).get(this.tp1)).offset());
        HashMap hashMap2 = new HashMap();
        hashMap2.put(this.tp0, 3L);
        hashMap2.put(this.tp1, 3L);
        mockClient.prepareResponse(listOffsetsResponse(hashMap2));
        Assertions.assertEquals(3L, this.consumer.position(this.tp0));
        Assertions.assertEquals(3L, this.consumer.position(this.tp1));
        mockClient.requests().clear();
        this.consumer.unsubscribe();
    }

    @EnumSource(GroupProtocol.class)
    @ParameterizedTest
    public void testPollWithNoSubscription(GroupProtocol groupProtocol) {
        this.consumer = newConsumer(groupProtocol, null);
        Assertions.assertThrows(IllegalStateException.class, () -> {
            this.consumer.poll(Duration.ZERO);
        });
    }

    @EnumSource(GroupProtocol.class)
    @ParameterizedTest
    public void testPollWithEmptySubscription(GroupProtocol groupProtocol) {
        this.consumer = newConsumer(groupProtocol, "mock-group");
        this.consumer.subscribe(Collections.emptyList());
        Assertions.assertThrows(IllegalStateException.class, () -> {
            this.consumer.poll(Duration.ZERO);
        });
    }

    @EnumSource(GroupProtocol.class)
    @ParameterizedTest
    public void testPollWithEmptyUserAssignment(GroupProtocol groupProtocol) {
        this.consumer = newConsumer(groupProtocol, "mock-group");
        this.consumer.assign(Collections.emptySet());
        Assertions.assertThrows(IllegalStateException.class, () -> {
            this.consumer.poll(Duration.ZERO);
        });
    }

    @EnumSource(value = GroupProtocol.class, names = {"CLASSIC"})
    @ParameterizedTest
    public void testGracefulClose(GroupProtocol groupProtocol) throws Exception {
        HashMap hashMap = new HashMap();
        hashMap.put(this.tp0, Errors.NONE);
        consumerCloseTest(groupProtocol, 5000L, Arrays.asList(offsetCommitResponse(hashMap), new LeaveGroupResponse(new LeaveGroupResponseData().setErrorCode(Errors.NONE.code())), FetchResponse.of(Errors.NONE, 0, 0, new LinkedHashMap())), 0L, false);
    }

    @EnumSource(value = GroupProtocol.class, names = {"CLASSIC"})
    @ParameterizedTest
    public void testCloseTimeoutDueToNoResponseForCloseFetchRequest(GroupProtocol groupProtocol) throws Exception {
        HashMap hashMap = new HashMap();
        hashMap.put(this.tp0, Errors.NONE);
        consumerCloseTest(groupProtocol, 5000L, Arrays.asList(offsetCommitResponse(hashMap), new LeaveGroupResponse(new LeaveGroupResponseData().setErrorCode(Errors.NONE.code()))), 6000L, false);
    }

    @EnumSource(value = GroupProtocol.class, names = {"CLASSIC"})
    @ParameterizedTest
    public void testCloseTimeout(GroupProtocol groupProtocol) throws Exception {
        consumerCloseTest(groupProtocol, 5000L, Collections.emptyList(), 5000L, false);
    }

    @EnumSource(value = GroupProtocol.class, names = {"CLASSIC"})
    @ParameterizedTest
    public void testLeaveGroupTimeout(GroupProtocol groupProtocol) throws Exception {
        HashMap hashMap = new HashMap();
        hashMap.put(this.tp0, Errors.NONE);
        consumerCloseTest(groupProtocol, 5000L, Collections.singletonList(offsetCommitResponse(hashMap)), 5000L, false);
    }

    @EnumSource(value = GroupProtocol.class, names = {"CLASSIC"})
    @ParameterizedTest
    public void testCloseNoWait(GroupProtocol groupProtocol) throws Exception {
        consumerCloseTest(groupProtocol, 0L, Collections.emptyList(), 0L, false);
    }

    @EnumSource(value = GroupProtocol.class, names = {"CLASSIC"})
    @ParameterizedTest
    public void testCloseInterrupt(GroupProtocol groupProtocol) throws Exception {
        consumerCloseTest(groupProtocol, Long.MAX_VALUE, Collections.emptyList(), 0L, true);
    }

    @EnumSource(value = GroupProtocol.class, names = {"CLASSIC"})
    @ParameterizedTest
    public void testCloseShouldBeIdempotent(GroupProtocol groupProtocol) {
        ConsumerMetadata createMetadata = createMetadata(this.subscription);
        MockClient mockClient = (MockClient) Mockito.spy(new MockClient(this.time, (Metadata) createMetadata));
        initMetadata(mockClient, Collections.singletonMap("test", 1));
        this.consumer = newConsumer(groupProtocol, this.time, mockClient, this.subscription, createMetadata, this.assignor, false, this.groupInstanceId);
        this.consumer.close(Duration.ZERO);
        this.consumer.close(Duration.ZERO);
        ((MockClient) Mockito.verify(mockClient)).close();
    }

    @EnumSource(GroupProtocol.class)
    @ParameterizedTest
    public void testOperationsBySubscribingConsumerWithDefaultGroupId(GroupProtocol groupProtocol) {
        KafkaConsumer<byte[], byte[]> newConsumer;
        Throwable th;
        KafkaConsumer<byte[], byte[]> newConsumer2;
        Throwable th2;
        KafkaConsumer<byte[], byte[]> newConsumer3;
        Throwable th3;
        try {
            newConsumer3 = newConsumer(groupProtocol, (String) null, Optional.of(Boolean.TRUE));
            th3 = null;
        } catch (InvalidConfigurationException e) {
        }
        try {
            Assertions.fail("Expected an InvalidConfigurationException");
            if (newConsumer3 != null) {
                if (0 != 0) {
                    try {
                        newConsumer3.close();
                    } catch (Throwable th4) {
                        th3.addSuppressed(th4);
                    }
                } else {
                    newConsumer3.close();
                }
            }
            try {
                newConsumer2 = newConsumer(groupProtocol, (String) null);
                th2 = null;
            } catch (InvalidGroupIdException e2) {
            }
            try {
                try {
                    newConsumer2.subscribe(Collections.singleton("test"));
                    Assertions.fail("Expected an InvalidGroupIdException");
                    if (newConsumer2 != null) {
                        if (0 != 0) {
                            try {
                                newConsumer2.close();
                            } catch (Throwable th5) {
                                th2.addSuppressed(th5);
                            }
                        } else {
                            newConsumer2.close();
                        }
                    }
                    try {
                        KafkaConsumer<byte[], byte[]> newConsumer4 = newConsumer(groupProtocol, (String) null);
                        Throwable th6 = null;
                        try {
                            try {
                                newConsumer4.committed(Collections.singleton(this.tp0)).get(this.tp0);
                                Assertions.fail("Expected an InvalidGroupIdException");
                                if (newConsumer4 != null) {
                                    if (0 != 0) {
                                        try {
                                            newConsumer4.close();
                                        } catch (Throwable th7) {
                                            th6.addSuppressed(th7);
                                        }
                                    } else {
                                        newConsumer4.close();
                                    }
                                }
                            } catch (Throwable th8) {
                                th6 = th8;
                                throw th8;
                            }
                        } finally {
                            if (newConsumer4 != null) {
                                if (th6 != null) {
                                    try {
                                        newConsumer4.close();
                                    } catch (Throwable th9) {
                                        th6.addSuppressed(th9);
                                    }
                                } else {
                                    newConsumer4.close();
                                }
                            }
                        }
                    } catch (InvalidGroupIdException e3) {
                    }
                    try {
                        newConsumer = newConsumer(groupProtocol, (String) null);
                        th = null;
                    } catch (InvalidGroupIdException e4) {
                    }
                } catch (Throwable th10) {
                    th2 = th10;
                    throw th10;
                }
                try {
                    try {
                        newConsumer.commitAsync();
                        Assertions.fail("Expected an InvalidGroupIdException");
                        if (newConsumer != null) {
                            if (0 != 0) {
                                try {
                                    newConsumer.close();
                                } catch (Throwable th11) {
                                    th.addSuppressed(th11);
                                }
                            } else {
                                newConsumer.close();
                            }
                        }
                        try {
                            newConsumer = newConsumer(groupProtocol, (String) null);
                            Throwable th12 = null;
                            try {
                                try {
                                    newConsumer.commitSync();
                                    Assertions.fail("Expected an InvalidGroupIdException");
                                    if (newConsumer != null) {
                                        if (0 != 0) {
                                            try {
                                                newConsumer.close();
                                            } catch (Throwable th13) {
                                                th12.addSuppressed(th13);
                                            }
                                        } else {
                                            newConsumer.close();
                                        }
                                    }
                                } catch (Throwable th14) {
                                    th12 = th14;
                                    throw th14;
                                }
                            } finally {
                            }
                        } catch (InvalidGroupIdException e5) {
                        }
                    } catch (Throwable th15) {
                        th = th15;
                        throw th15;
                    }
                } finally {
                    if (newConsumer != null) {
                        if (th != null) {
                            try {
                                newConsumer.close();
                            } catch (Throwable th16) {
                                th.addSuppressed(th16);
                            }
                        } else {
                            newConsumer.close();
                        }
                    }
                }
            } finally {
                if (newConsumer2 != null) {
                    if (th2 != null) {
                        try {
                            newConsumer2.close();
                        } catch (Throwable th17) {
                            th2.addSuppressed(th17);
                        }
                    } else {
                        newConsumer2.close();
                    }
                }
            }
        } finally {
        }
    }

    @EnumSource(GroupProtocol.class)
    @ParameterizedTest
    public void testOperationsByAssigningConsumerWithDefaultGroupId(GroupProtocol groupProtocol) {
        KafkaConsumer<byte[], byte[]> newConsumer = newConsumer(groupProtocol, null);
        newConsumer.assign(Collections.singleton(this.tp0));
        try {
            newConsumer.committed(Collections.singleton(this.tp0)).get(this.tp0);
            Assertions.fail("Expected an InvalidGroupIdException");
        } catch (InvalidGroupIdException e) {
        }
        try {
            newConsumer.commitAsync();
            Assertions.fail("Expected an InvalidGroupIdException");
        } catch (InvalidGroupIdException e2) {
        }
        try {
            newConsumer.commitSync();
            Assertions.fail("Expected an InvalidGroupIdException");
        } catch (InvalidGroupIdException e3) {
        }
        newConsumer.close();
    }

    @EnumSource(GroupProtocol.class)
    @ParameterizedTest
    public void testMetricConfigRecordingLevelInfo(GroupProtocol groupProtocol) {
        Properties properties = new Properties();
        properties.put("group.protocol", groupProtocol.name());
        properties.put("bootstrap.servers", "localhost:9000");
        KafkaConsumer newConsumer = newConsumer(properties, (Deserializer) new ByteArrayDeserializer(), (Deserializer) new ByteArrayDeserializer());
        Assertions.assertEquals(Sensor.RecordingLevel.INFO, newConsumer.metricsRegistry().config().recordLevel());
        newConsumer.close(Duration.ZERO);
        properties.put("metrics.recording.level", "DEBUG");
        KafkaConsumer newConsumer2 = newConsumer(properties, (Deserializer) new ByteArrayDeserializer(), (Deserializer) new ByteArrayDeserializer());
        Assertions.assertEquals(Sensor.RecordingLevel.DEBUG, newConsumer2.metricsRegistry().config().recordLevel());
        newConsumer2.close(Duration.ZERO);
    }

    @EnumSource(value = GroupProtocol.class, names = {"CLASSIC"})
    @ParameterizedTest
    public void testShouldAttemptToRejoinGroupAfterSyncGroupFailed(GroupProtocol groupProtocol) throws Exception {
        ConsumerMetadata createMetadata = createMetadata(this.subscription);
        MockClient mockClient = new MockClient(this.time, (Metadata) createMetadata);
        initMetadata(mockClient, Collections.singletonMap("test", 1));
        Node node = (Node) createMetadata.fetch().nodes().get(0);
        this.consumer = newConsumer(groupProtocol, this.time, mockClient, this.subscription, createMetadata, this.assignor, false, this.groupInstanceId);
        this.consumer.subscribe(Collections.singleton("test"), getConsumerRebalanceListener(this.consumer));
        mockClient.prepareResponseFrom(FindCoordinatorResponse.prepareResponse(Errors.NONE, "mock-group", node), node);
        Node node2 = new Node(Integer.MAX_VALUE - node.id(), node.host(), node.port());
        mockClient.prepareResponseFrom(joinGroupFollowerResponse(this.assignor, 1, "memberId", "leaderId", Errors.NONE), node2);
        mockClient.prepareResponseFrom(syncGroupResponse(Collections.singletonList(this.tp0), Errors.NONE), node2);
        mockClient.prepareResponseFrom(fetchResponse(this.tp0, 0L, 1), node);
        mockClient.prepareResponseFrom(fetchResponse(this.tp0, 1L, 0), node);
        this.consumer.updateAssignmentMetadataIfNeeded(this.time.timer(Long.MAX_VALUE));
        this.consumer.poll(Duration.ZERO);
        mockClient.prepareResponseFrom(abstractRequest -> {
            return true;
        }, (AbstractResponse) new HeartbeatResponse(new HeartbeatResponseData().setErrorCode(Errors.REBALANCE_IN_PROGRESS.code())), node2);
        mockClient.prepareResponseFrom(new JoinGroupResponse(new JoinGroupResponseData().setErrorCode(Errors.NONE.code()).setGenerationId(1).setProtocolName(this.assignor.name()).setLeader("memberId").setMemberId("memberId").setMembers(Collections.singletonList(new JoinGroupResponseData.JoinGroupResponseMember().setMemberId("memberId").setMetadata(ConsumerProtocol.serializeSubscription(new ConsumerPartitionAssignor.Subscription(Collections.singletonList("test"))).array()))), ApiKeys.JOIN_GROUP.latestVersion()), node2);
        mockClient.prepareResponseFrom((AbstractResponse) syncGroupResponse(Collections.singletonList(this.tp0), Errors.NONE), node2, true);
        mockClient.prepareResponseFrom(FindCoordinatorResponse.prepareResponse(Errors.NONE, "mock-group", node), node);
        mockClient.prepareResponseFrom(joinGroupFollowerResponse(this.assignor, 1, "memberId", "leaderId", Errors.NONE), node2);
        mockClient.prepareResponseFrom(syncGroupResponse(Collections.singletonList(this.tp0), Errors.NONE), node2);
        mockClient.prepareResponseFrom(abstractRequest2 -> {
            return (abstractRequest2 instanceof FetchRequest) && RequestTestUtils.fetchPartitionMap(((FetchRequest) abstractRequest2).data(), Short.valueOf(abstractRequest2.version()), this.topicNames).containsKey(new TopicIdPartition(this.topicId, this.tp0));
        }, (AbstractResponse) fetchResponse(this.tp0, 1L, 1), node);
        this.time.sleep(1000L);
        Thread.sleep(1000L);
        this.consumer.updateAssignmentMetadataIfNeeded(this.time.timer(Long.MAX_VALUE));
        Assertions.assertFalse(this.consumer.poll(Duration.ZERO).isEmpty());
    }

    private void consumerCloseTest(GroupProtocol groupProtocol, long j, List<? extends AbstractResponse> list, long j2, boolean z) throws Exception {
        ConsumerMetadata createMetadata = createMetadata(this.subscription);
        MockClient mockClient = new MockClient(this.time, (Metadata) createMetadata);
        initMetadata(mockClient, Collections.singletonMap("test", 1));
        Node node = (Node) createMetadata.fetch().nodes().get(0);
        KafkaConsumer<String, String> newConsumer = newConsumer(groupProtocol, this.time, mockClient, this.subscription, createMetadata, this.assignor, false, Optional.empty());
        newConsumer.subscribe(Collections.singleton("test"), getConsumerRebalanceListener(newConsumer));
        Node prepareRebalance = prepareRebalance(mockClient, node, this.assignor, Collections.singletonList(this.tp0), null);
        mockClient.prepareMetadataUpdate(RequestTestUtils.metadataUpdateWithIds(1, (Map<String, Integer>) Collections.singletonMap("test", 1), this.topicIds));
        newConsumer.updateAssignmentMetadataIfNeeded(this.time.timer(Long.MAX_VALUE));
        mockClient.prepareResponseFrom(fetchResponse(this.tp0, 0L, 1), node);
        mockClient.prepareResponseFrom(fetchResponse(this.tp0, 1L, 0), node);
        newConsumer.poll(Duration.ZERO);
        ExecutorService newSingleThreadExecutor = Executors.newSingleThreadExecutor();
        AtomicReference atomicReference = new AtomicReference();
        try {
            Future<?> submit = newSingleThreadExecutor.submit(() -> {
                newConsumer.commitAsync();
                try {
                    newConsumer.close(Duration.ofMillis(j));
                } catch (Exception e) {
                    atomicReference.set(e);
                }
            });
            try {
                submit.get(100L, TimeUnit.MILLISECONDS);
                if (j != 0) {
                    Assertions.fail("Close completed without waiting for commit or leave response");
                }
            } catch (TimeoutException e) {
            }
            mockClient.waitForRequests(2, 1000L);
            for (int i = 0; i < list.size(); i++) {
                mockClient.waitForRequests(1, 1000L);
                if (i == list.size() - 1 && (list.get(i) instanceof FetchResponse)) {
                    mockClient.respondFrom(list.get(i), node);
                } else {
                    mockClient.respondFrom(list.get(i), prepareRebalance);
                }
                if (i < 1) {
                    try {
                        submit.get(100L, TimeUnit.MILLISECONDS);
                        Assertions.fail("Close completed without waiting for response");
                    } catch (TimeoutException e2) {
                    }
                }
            }
            if (j2 > 0) {
                this.time.sleep(j2);
            }
            if (z) {
                Assertions.assertTrue(submit.cancel(true), "Close terminated prematurely");
                TestUtils.waitForCondition(() -> {
                    return atomicReference.get() != null;
                }, "InterruptException did not occur within timeout.");
                Assertions.assertTrue(atomicReference.get() instanceof InterruptException, "Expected exception not thrown " + atomicReference);
            } else {
                submit.get(j, TimeUnit.MILLISECONDS);
                Assertions.assertNull(atomicReference.get(), "Unexpected exception during close");
            }
        } finally {
            newSingleThreadExecutor.shutdownNow();
        }
    }

    @EnumSource(value = GroupProtocol.class, names = {"CLASSIC"})
    @ParameterizedTest
    public void testPartitionsForNonExistingTopic(GroupProtocol groupProtocol) {
        ConsumerMetadata createMetadata = createMetadata(this.subscription);
        MockClient mockClient = new MockClient(this.time, (Metadata) createMetadata);
        initMetadata(mockClient, Collections.singletonMap("test", 1));
        Cluster fetch = createMetadata.fetch();
        mockClient.prepareResponse(RequestTestUtils.metadataResponse(fetch.nodes(), fetch.clusterResource().clusterId(), fetch.controller().id(), Collections.emptyList()));
        Assertions.assertEquals(Collections.emptyList(), newConsumer(groupProtocol, this.time, mockClient, this.subscription, createMetadata, this.assignor, true, this.groupInstanceId).partitionsFor("non-exist-topic"));
    }

    @EnumSource(value = GroupProtocol.class, names = {"CLASSIC"})
    @ParameterizedTest
    public void testPartitionsForAuthenticationFailure(GroupProtocol groupProtocol) {
        KafkaConsumer<String, String> consumerWithPendingAuthenticationError = consumerWithPendingAuthenticationError(groupProtocol);
        Assertions.assertThrows(AuthenticationException.class, () -> {
            consumerWithPendingAuthenticationError.partitionsFor("some other topic");
        });
    }

    @EnumSource(value = GroupProtocol.class, names = {"CLASSIC"})
    @ParameterizedTest
    public void testBeginningOffsetsAuthenticationFailure(GroupProtocol groupProtocol) {
        KafkaConsumer<String, String> consumerWithPendingAuthenticationError = consumerWithPendingAuthenticationError(groupProtocol);
        Assertions.assertThrows(AuthenticationException.class, () -> {
            consumerWithPendingAuthenticationError.beginningOffsets(Collections.singleton(this.tp0));
        });
    }

    @EnumSource(value = GroupProtocol.class, names = {"CLASSIC"})
    @ParameterizedTest
    public void testEndOffsetsAuthenticationFailure(GroupProtocol groupProtocol) {
        KafkaConsumer<String, String> consumerWithPendingAuthenticationError = consumerWithPendingAuthenticationError(groupProtocol);
        Assertions.assertThrows(AuthenticationException.class, () -> {
            consumerWithPendingAuthenticationError.endOffsets(Collections.singleton(this.tp0));
        });
    }

    @EnumSource(value = GroupProtocol.class, names = {"CLASSIC"})
    @ParameterizedTest
    public void testPollAuthenticationFailure(GroupProtocol groupProtocol) {
        KafkaConsumer<String, String> consumerWithPendingAuthenticationError = consumerWithPendingAuthenticationError(groupProtocol);
        consumerWithPendingAuthenticationError.subscribe(Collections.singleton("test"));
        Assertions.assertThrows(AuthenticationException.class, () -> {
            consumerWithPendingAuthenticationError.poll(Duration.ZERO);
        });
    }

    @EnumSource(value = GroupProtocol.class, names = {"CLASSIC"})
    @ParameterizedTest
    public void testOffsetsForTimesAuthenticationFailure(GroupProtocol groupProtocol) {
        KafkaConsumer<String, String> consumerWithPendingAuthenticationError = consumerWithPendingAuthenticationError(groupProtocol);
        Assertions.assertThrows(AuthenticationException.class, () -> {
            consumerWithPendingAuthenticationError.offsetsForTimes(Collections.singletonMap(this.tp0, 0L));
        });
    }

    @EnumSource(value = GroupProtocol.class, names = {"CLASSIC"})
    @ParameterizedTest
    public void testCommitSyncAuthenticationFailure(GroupProtocol groupProtocol) {
        KafkaConsumer<String, String> consumerWithPendingAuthenticationError = consumerWithPendingAuthenticationError(groupProtocol);
        HashMap hashMap = new HashMap();
        hashMap.put(this.tp0, new OffsetAndMetadata(10L));
        Assertions.assertThrows(AuthenticationException.class, () -> {
            consumerWithPendingAuthenticationError.commitSync(hashMap);
        });
    }

    @EnumSource(value = GroupProtocol.class, names = {"CLASSIC"})
    @ParameterizedTest
    public void testCommittedAuthenticationFailure(GroupProtocol groupProtocol) {
        KafkaConsumer<String, String> consumerWithPendingAuthenticationError = consumerWithPendingAuthenticationError(groupProtocol);
        Assertions.assertThrows(AuthenticationException.class, () -> {
        });
    }

    @EnumSource(GroupProtocol.class)
    @ParameterizedTest
    public void testMeasureCommitSyncDurationOnFailure(GroupProtocol groupProtocol) {
        KafkaConsumer<String, String> consumerWithPendingError = consumerWithPendingError(groupProtocol, new MockTime(Duration.ofSeconds(1L).toMillis()));
        try {
            consumerWithPendingError.commitSync(Collections.singletonMap(this.tp0, new OffsetAndMetadata(10L)));
        } catch (RuntimeException e) {
        }
        Assertions.assertTrue(((Double) ((Metric) consumerWithPendingError.metrics().get(consumerWithPendingError.metricsRegistry().metricName("commit-sync-time-ns-total", "consumer-metrics"))).metricValue()).doubleValue() >= ((double) Duration.ofMillis(999L).toNanos()));
    }

    @EnumSource(value = GroupProtocol.class, names = {"CLASSIC"})
    @ParameterizedTest
    public void testMeasureCommitSyncDuration(GroupProtocol groupProtocol) {
        MockTime mockTime = new MockTime(Duration.ofSeconds(1L).toMillis());
        SubscriptionState subscriptionState = new SubscriptionState(new LogContext(), OffsetResetStrategy.EARLIEST);
        ConsumerMetadata createMetadata = createMetadata(subscriptionState);
        MockClient mockClient = new MockClient((Time) mockTime, (Metadata) createMetadata);
        initMetadata(mockClient, Collections.singletonMap("test", 2));
        Node node = (Node) createMetadata.fetch().nodes().get(0);
        KafkaConsumer<String, String> newConsumer = newConsumer(groupProtocol, mockTime, mockClient, subscriptionState, createMetadata, this.assignor, true, this.groupInstanceId);
        newConsumer.assign(Collections.singletonList(this.tp0));
        mockClient.prepareResponseFrom(FindCoordinatorResponse.prepareResponse(Errors.NONE, "mock-group", node), node);
        mockClient.prepareResponseFrom(offsetCommitResponse(Collections.singletonMap(this.tp0, Errors.NONE)), new Node(Integer.MAX_VALUE - node.id(), node.host(), node.port()));
        newConsumer.commitSync(Collections.singletonMap(this.tp0, new OffsetAndMetadata(10L)));
        Assertions.assertTrue(((Double) ((Metric) newConsumer.metrics().get(newConsumer.metricsRegistry().metricName("commit-sync-time-ns-total", "consumer-metrics"))).metricValue()).doubleValue() >= ((double) Duration.ofMillis(999L).toNanos()));
    }

    @EnumSource(value = GroupProtocol.class, names = {"CLASSIC"})
    @ParameterizedTest
    public void testMeasureCommittedDurationOnFailure(GroupProtocol groupProtocol) {
        KafkaConsumer<String, String> consumerWithPendingError = consumerWithPendingError(groupProtocol, new MockTime(Duration.ofSeconds(1L).toMillis()));
        try {
            consumerWithPendingError.committed(Collections.singleton(this.tp0));
        } catch (RuntimeException e) {
        }
        Assertions.assertTrue(((Double) ((Metric) consumerWithPendingError.metrics().get(consumerWithPendingError.metricsRegistry().metricName("committed-time-ns-total", "consumer-metrics"))).metricValue()).doubleValue() >= ((double) Duration.ofMillis(999L).toNanos()));
    }

    @EnumSource(value = GroupProtocol.class, names = {"CLASSIC"})
    @ParameterizedTest
    public void testMeasureCommittedDuration(GroupProtocol groupProtocol) {
        MockTime mockTime = new MockTime(Duration.ofSeconds(1L).toMillis());
        SubscriptionState subscriptionState = new SubscriptionState(new LogContext(), OffsetResetStrategy.EARLIEST);
        ConsumerMetadata createMetadata = createMetadata(subscriptionState);
        MockClient mockClient = new MockClient((Time) mockTime, (Metadata) createMetadata);
        initMetadata(mockClient, Collections.singletonMap("test", 2));
        Node node = (Node) createMetadata.fetch().nodes().get(0);
        KafkaConsumer<String, String> newConsumer = newConsumer(groupProtocol, mockTime, mockClient, subscriptionState, createMetadata, this.assignor, true, this.groupInstanceId);
        newConsumer.assign(Collections.singletonList(this.tp0));
        mockClient.prepareResponseFrom(FindCoordinatorResponse.prepareResponse(Errors.NONE, "mock-group", node), node);
        mockClient.prepareResponseFrom(offsetResponse(Collections.singletonMap(this.tp0, 10000L), Errors.NONE), new Node(Integer.MAX_VALUE - node.id(), node.host(), node.port()));
        ((OffsetAndMetadata) newConsumer.committed(Collections.singleton(this.tp0)).get(this.tp0)).offset();
        Assertions.assertTrue(((Double) ((Metric) newConsumer.metrics().get(newConsumer.metricsRegistry().metricName("committed-time-ns-total", "consumer-metrics"))).metricValue()).doubleValue() >= ((double) Duration.ofMillis(999L).toNanos()));
    }

    @EnumSource(value = GroupProtocol.class, names = {"CLASSIC"})
    @ParameterizedTest
    public void testRebalanceException(GroupProtocol groupProtocol) {
        ConsumerMetadata createMetadata = createMetadata(this.subscription);
        MockClient mockClient = new MockClient(this.time, (Metadata) createMetadata);
        initMetadata(mockClient, Collections.singletonMap("test", 1));
        Node node = (Node) createMetadata.fetch().nodes().get(0);
        KafkaConsumer<String, String> newConsumer = newConsumer(groupProtocol, this.time, mockClient, this.subscription, createMetadata, this.assignor, true, this.groupInstanceId);
        newConsumer.subscribe(Collections.singleton("test"), getExceptionConsumerRebalanceListener());
        Node node2 = new Node(Integer.MAX_VALUE - node.id(), node.host(), node.port());
        mockClient.prepareResponseFrom(FindCoordinatorResponse.prepareResponse(Errors.NONE, "mock-group", node), node);
        mockClient.prepareResponseFrom(joinGroupFollowerResponse(this.assignor, 1, "memberId", "leaderId", Errors.NONE), node2);
        mockClient.prepareResponseFrom(syncGroupResponse(Collections.singletonList(this.tp0), Errors.NONE), node2);
        try {
            newConsumer.updateAssignmentMetadataIfNeeded(this.time.timer(Long.MAX_VALUE));
            Assertions.fail("Should throw exception");
        } catch (Throwable th) {
            Assertions.assertEquals("Hit partition assign " + this.singleTopicPartition, th.getCause().getMessage());
        }
        Assertions.assertEquals(Collections.singleton(this.tp0), this.subscription.assignedPartitions());
        try {
            newConsumer.close(Duration.ofMillis(0L));
            Assertions.fail("Should throw exception");
        } catch (Throwable th2) {
            Assertions.assertEquals("Hit partition revoke " + this.singleTopicPartition, th2.getCause().getCause().getMessage());
        }
        newConsumer.close(Duration.ofMillis(0L));
        Assertions.assertTrue(this.subscription.assignedPartitions().isEmpty());
    }

    @EnumSource(value = GroupProtocol.class, names = {"CLASSIC"})
    @ParameterizedTest
    public void testReturnRecordsDuringRebalance(GroupProtocol groupProtocol) throws InterruptedException {
        Time mockTime = new MockTime(1L);
        ConsumerMetadata createMetadata = createMetadata(this.subscription);
        MockClient mockClient = new MockClient(mockTime, (Metadata) createMetadata);
        ConsumerPartitionAssignor cooperativeStickyAssignor = new CooperativeStickyAssignor();
        KafkaConsumer<String, String> newConsumer = newConsumer(groupProtocol, mockTime, mockClient, this.subscription, createMetadata, cooperativeStickyAssignor, true, this.groupInstanceId);
        initMetadata(mockClient, Utils.mkMap(new Map.Entry[]{Utils.mkEntry("test", 1), Utils.mkEntry("test2", 1), Utils.mkEntry("test3", 1)}));
        newConsumer.subscribe(Arrays.asList("test", "test2"), getConsumerRebalanceListener(newConsumer));
        Node node = (Node) createMetadata.fetch().nodes().get(0);
        Node prepareRebalance = prepareRebalance(mockClient, node, cooperativeStickyAssignor, Arrays.asList(this.tp0, this.t2p0), null);
        TestUtils.waitForCondition(() -> {
            newConsumer.poll(Duration.ofMillis(100L));
            return newConsumer.assignment().equals(Utils.mkSet(new TopicPartition[]{this.tp0, this.t2p0}));
        }, "Does not complete rebalance in time");
        Assertions.assertEquals(Utils.mkSet(new String[]{"test", "test2"}), newConsumer.subscription());
        Assertions.assertEquals(Utils.mkSet(new TopicPartition[]{this.tp0, this.t2p0}), newConsumer.assignment());
        Map<TopicPartition, FetchInfo> hashMap = new HashMap<>();
        hashMap.put(this.tp0, new FetchInfo(0L, 1));
        hashMap.put(this.t2p0, new FetchInfo(0L, 10));
        mockClient.respondFrom(fetchResponse(hashMap), node);
        Assertions.assertEquals(11, newConsumer.poll(Duration.ZERO).count());
        Assertions.assertEquals(1L, newConsumer.position(this.tp0));
        Assertions.assertEquals(10L, newConsumer.position(this.t2p0));
        hashMap.clear();
        hashMap.put(this.tp0, new FetchInfo(1L, 1));
        hashMap.put(this.t2p0, new FetchInfo(10L, 20));
        mockClient.respondFrom(fetchResponse(hashMap), node);
        newConsumer.subscribe(Arrays.asList("test", "test3"), getConsumerRebalanceListener(newConsumer));
        Assertions.assertEquals(Utils.mkSet(new String[]{"test", "test3"}), newConsumer.subscription());
        Assertions.assertEquals(Utils.mkSet(new TopicPartition[]{this.tp0, this.t2p0}), newConsumer.assignment());
        Map<TopicPartition, Long> hashMap2 = new HashMap<>();
        hashMap2.put(this.t2p0, 10L);
        AtomicBoolean prepareOffsetCommitResponse = prepareOffsetCommitResponse(mockClient, prepareRebalance, hashMap2);
        ConsumerRecords poll = newConsumer.poll(Duration.ZERO);
        hashMap.clear();
        hashMap.put(this.tp0, new FetchInfo(2L, 1));
        mockClient.respondFrom(fetchResponse(hashMap), node);
        Assertions.assertEquals(Utils.mkSet(new String[]{"test", "test3"}), newConsumer.subscription());
        Assertions.assertEquals(Collections.singleton(this.tp0), newConsumer.assignment());
        Assertions.assertEquals(1, poll.count());
        Assertions.assertEquals(2L, newConsumer.position(this.tp0));
        Assertions.assertTrue(prepareOffsetCommitResponse.get());
        mockClient.respondFrom(joinGroupFollowerResponse(cooperativeStickyAssignor, 2, "memberId", "leaderId", Errors.NONE), prepareRebalance);
        ConsumerRecords poll2 = newConsumer.poll(Duration.ZERO);
        Assertions.assertEquals(Utils.mkSet(new String[]{"test", "test3"}), newConsumer.subscription());
        Assertions.assertEquals(Collections.singleton(this.tp0), newConsumer.assignment());
        Assertions.assertEquals(1, poll2.count());
        Assertions.assertEquals(3L, newConsumer.position(this.tp0));
        hashMap.clear();
        hashMap.put(this.tp0, new FetchInfo(3L, 1));
        mockClient.respondFrom(fetchResponse(hashMap), node);
        mockClient.respondFrom(syncGroupResponse(Arrays.asList(this.tp0, this.t3p0), Errors.NONE), prepareRebalance);
        AtomicInteger atomicInteger = new AtomicInteger(0);
        TestUtils.waitForCondition(() -> {
            return newConsumer.assignment().equals(Utils.mkSet(new TopicPartition[]{this.tp0, this.t3p0})) && atomicInteger.addAndGet(newConsumer.poll(Duration.ofMillis(100L)).count()) == 1;
        }, "Does not complete rebalance in time");
        Assertions.assertEquals(Utils.mkSet(new String[]{"test", "test3"}), newConsumer.subscription());
        Assertions.assertEquals(Utils.mkSet(new TopicPartition[]{this.tp0, this.t3p0}), newConsumer.assignment());
        Assertions.assertEquals(4L, newConsumer.position(this.tp0));
        Assertions.assertEquals(0L, newConsumer.position(this.t3p0));
        hashMap.clear();
        hashMap.put(this.tp0, new FetchInfo(4L, 1));
        hashMap.put(this.t3p0, new FetchInfo(0L, 100));
        mockClient.respondFrom(fetchResponse(hashMap), node);
        atomicInteger.set(0);
        TestUtils.waitForCondition(() -> {
            return atomicInteger.addAndGet(newConsumer.poll(Duration.ofMillis(100L)).count()) == 101;
        }, "Does not complete rebalance in time");
        Assertions.assertEquals(5L, newConsumer.position(this.tp0));
        Assertions.assertEquals(100L, newConsumer.position(this.t3p0));
        mockClient.requests().clear();
        newConsumer.unsubscribe();
        newConsumer.close(Duration.ZERO);
    }

    @EnumSource(value = GroupProtocol.class, names = {"CLASSIC"})
    @ParameterizedTest
    public void testGetGroupMetadata(GroupProtocol groupProtocol) {
        ConsumerMetadata createMetadata = createMetadata(this.subscription);
        MockClient mockClient = new MockClient(this.time, (Metadata) createMetadata);
        initMetadata(mockClient, Collections.singletonMap("test", 1));
        Node node = (Node) createMetadata.fetch().nodes().get(0);
        KafkaConsumer<String, String> newConsumer = newConsumer(groupProtocol, this.time, mockClient, this.subscription, createMetadata, this.assignor, true, this.groupInstanceId);
        ConsumerGroupMetadata groupMetadata = newConsumer.groupMetadata();
        Assertions.assertEquals("mock-group", groupMetadata.groupId());
        Assertions.assertEquals("", groupMetadata.memberId());
        Assertions.assertEquals(-1, groupMetadata.generationId());
        Assertions.assertEquals(this.groupInstanceId, groupMetadata.groupInstanceId());
        newConsumer.subscribe(Collections.singleton("test"), getConsumerRebalanceListener(newConsumer));
        prepareRebalance(mockClient, node, this.assignor, Collections.singletonList(this.tp0), null);
        mockClient.prepareResponseFrom(fetchResponse(this.tp0, 0L, 0), node);
        newConsumer.updateAssignmentMetadataIfNeeded(this.time.timer(Long.MAX_VALUE));
        ConsumerGroupMetadata groupMetadata2 = newConsumer.groupMetadata();
        Assertions.assertEquals("mock-group", groupMetadata2.groupId());
        Assertions.assertEquals("memberId", groupMetadata2.memberId());
        Assertions.assertEquals(1, groupMetadata2.generationId());
        Assertions.assertEquals(this.groupInstanceId, groupMetadata2.groupInstanceId());
    }

    @EnumSource(value = GroupProtocol.class, names = {"CLASSIC"})
    @ParameterizedTest
    public void testInvalidGroupMetadata(GroupProtocol groupProtocol) throws InterruptedException {
        ConsumerMetadata createMetadata = createMetadata(this.subscription);
        MockClient mockClient = new MockClient(this.time, (Metadata) createMetadata);
        initMetadata(mockClient, Collections.singletonMap("test", 1));
        KafkaConsumer<String, String> newConsumer = newConsumer(groupProtocol, this.time, mockClient, this.subscription, createMetadata, new RoundRobinAssignor(), true, this.groupInstanceId);
        newConsumer.subscribe(Collections.singletonList("test"));
        mockClient.enableBlockingUntilWakeup(1);
        ExecutorService newSingleThreadExecutor = Executors.newSingleThreadExecutor();
        newSingleThreadExecutor.execute(() -> {
            newConsumer.poll(Duration.ofSeconds(5L));
        });
        try {
            TimeUnit.SECONDS.sleep(1L);
            newConsumer.getClass();
            Assertions.assertThrows(ConcurrentModificationException.class, newConsumer::groupMetadata);
            mockClient.wakeup();
            newConsumer.wakeup();
            newSingleThreadExecutor.shutdown();
            Assertions.assertTrue(newSingleThreadExecutor.awaitTermination(10L, TimeUnit.SECONDS));
            newConsumer.close(Duration.ZERO);
            newConsumer.getClass();
            Assertions.assertThrows(IllegalStateException.class, newConsumer::groupMetadata);
        } catch (Throwable th) {
            newSingleThreadExecutor.shutdown();
            Assertions.assertTrue(newSingleThreadExecutor.awaitTermination(10L, TimeUnit.SECONDS));
            throw th;
        }
    }

    @EnumSource(value = GroupProtocol.class, names = {"CLASSIC"})
    @ParameterizedTest
    public void testCurrentLag(GroupProtocol groupProtocol) {
        ConsumerMetadata createMetadata = createMetadata(this.subscription);
        MockClient mockClient = new MockClient(this.time, (Metadata) createMetadata);
        initMetadata(mockClient, Collections.singletonMap("test", 1));
        this.consumer = newConsumer(groupProtocol, this.time, mockClient, this.subscription, createMetadata, this.assignor, true, this.groupInstanceId);
        Assertions.assertThrows(IllegalStateException.class, () -> {
            this.consumer.currentLag(this.tp0);
        });
        this.consumer.assign(Collections.singleton(this.tp0));
        this.consumer.poll(Duration.ofMillis(0L));
        mockClient.respond(FindCoordinatorResponse.prepareResponse(Errors.NONE, "mock-group", (Node) createMetadata.fetch().nodes().get(0)));
        Assertions.assertEquals(OptionalLong.empty(), this.consumer.currentLag(this.tp0));
        Assertions.assertEquals(0, mockClient.inFlightRequestCount());
        this.consumer.seek(this.tp0, 50L);
        this.consumer.poll(Duration.ofMillis(0L));
        Assertions.assertEquals(2, mockClient.inFlightRequestCount());
        Assertions.assertEquals(OptionalLong.empty(), this.consumer.currentLag(this.tp0));
        mockClient.respond(listOffsetsResponse(Collections.singletonMap(this.tp0, 90L)));
        this.consumer.poll(Duration.ofMillis(0L));
        Assertions.assertEquals(OptionalLong.of(40L), this.consumer.currentLag(this.tp0));
        Assertions.assertEquals(1, mockClient.inFlightRequestCount());
        mockClient.respond(fetchResponse(Collections.singletonMap(this.tp0, new FetchInfo(1L, 99L, 50L, 5))));
        Assertions.assertEquals(5, this.consumer.poll(Duration.ofMillis(1L)).count());
        Assertions.assertEquals(55L, this.consumer.position(this.tp0));
        Assertions.assertEquals(OptionalLong.of(45L), this.consumer.currentLag(this.tp0));
    }

    @EnumSource(value = GroupProtocol.class, names = {"CLASSIC"})
    @ParameterizedTest
    public void testListOffsetShouldUpdateSubscriptions(GroupProtocol groupProtocol) {
        ConsumerMetadata createMetadata = createMetadata(this.subscription);
        MockClient mockClient = new MockClient(this.time, (Metadata) createMetadata);
        initMetadata(mockClient, Collections.singletonMap("test", 1));
        this.consumer = newConsumer(groupProtocol, this.time, mockClient, this.subscription, createMetadata, this.assignor, true, this.groupInstanceId);
        this.consumer.assign(Collections.singleton(this.tp0));
        this.consumer.poll(Duration.ofMillis(0L));
        mockClient.respond(FindCoordinatorResponse.prepareResponse(Errors.NONE, "mock-group", (Node) createMetadata.fetch().nodes().get(0)));
        this.consumer.seek(this.tp0, 50L);
        mockClient.prepareResponse(listOffsetsResponse(Collections.singletonMap(this.tp0, 90L)));
        Assertions.assertEquals(Collections.singletonMap(this.tp0, 90L), this.consumer.endOffsets(Collections.singleton(this.tp0)));
        Assertions.assertEquals(OptionalLong.of(40L), this.consumer.currentLag(this.tp0));
    }

    private KafkaConsumer<String, String> consumerWithPendingAuthenticationError(GroupProtocol groupProtocol, Time time) {
        ConsumerMetadata createMetadata = createMetadata(this.subscription);
        MockClient mockClient = new MockClient(time, (Metadata) createMetadata);
        initMetadata(mockClient, Collections.singletonMap("test", 1));
        Node node = (Node) createMetadata.fetch().nodes().get(0);
        RangeAssignor rangeAssignor = new RangeAssignor();
        mockClient.createPendingAuthenticationError(node, 0L);
        return newConsumer(groupProtocol, time, mockClient, this.subscription, createMetadata, rangeAssignor, false, this.groupInstanceId);
    }

    private KafkaConsumer<String, String> consumerWithPendingAuthenticationError(GroupProtocol groupProtocol) {
        return consumerWithPendingAuthenticationError(groupProtocol, new MockTime());
    }

    private KafkaConsumer<String, String> consumerWithPendingError(GroupProtocol groupProtocol, Time time) {
        return consumerWithPendingAuthenticationError(groupProtocol, time);
    }

    private ConsumerRebalanceListener getConsumerRebalanceListener(final KafkaConsumer<?, ?> kafkaConsumer) {
        return new ConsumerRebalanceListener() { // from class: org.apache.kafka.clients.consumer.KafkaConsumerTest.2
            public void onPartitionsRevoked(Collection<TopicPartition> collection) {
            }

            public void onPartitionsAssigned(Collection<TopicPartition> collection) {
                Iterator<TopicPartition> it = collection.iterator();
                while (it.hasNext()) {
                    kafkaConsumer.seek(it.next(), 0L);
                }
            }
        };
    }

    private ConsumerRebalanceListener getExceptionConsumerRebalanceListener() {
        return new ConsumerRebalanceListener() { // from class: org.apache.kafka.clients.consumer.KafkaConsumerTest.3
            public void onPartitionsRevoked(Collection<TopicPartition> collection) {
                throw new RuntimeException("Hit partition revoke " + collection);
            }

            public void onPartitionsAssigned(Collection<TopicPartition> collection) {
                throw new RuntimeException("Hit partition assign " + collection);
            }

            public void onPartitionsLost(Collection<TopicPartition> collection) {
                throw new RuntimeException("Hit partition lost " + collection);
            }
        };
    }

    private ConsumerMetadata createMetadata(SubscriptionState subscriptionState) {
        return new ConsumerMetadata(0L, 0L, Long.MAX_VALUE, false, false, subscriptionState, new LogContext(), new ClusterResourceListeners());
    }

    private Node prepareRebalance(MockClient mockClient, Node node, Set<String> set, ConsumerPartitionAssignor consumerPartitionAssignor, List<TopicPartition> list, Node node2) {
        if (node2 == null) {
            mockClient.prepareResponseFrom(FindCoordinatorResponse.prepareResponse(Errors.NONE, "mock-group", node), node);
            node2 = new Node(Integer.MAX_VALUE - node.id(), node.host(), node.port());
        }
        mockClient.prepareResponseFrom(abstractRequest -> {
            Iterator it = ((JoinGroupRequest) abstractRequest).data().protocols().iterator();
            Assertions.assertTrue(it.hasNext());
            return set.equals(new HashSet(ConsumerProtocol.deserializeSubscription(ByteBuffer.wrap(((JoinGroupRequestData.JoinGroupRequestProtocol) it.next()).metadata())).topics()));
        }, (AbstractResponse) joinGroupFollowerResponse(consumerPartitionAssignor, 1, "memberId", "leaderId", Errors.NONE), node2);
        mockClient.prepareResponseFrom(syncGroupResponse(list, Errors.NONE), node2);
        return node2;
    }

    private Node prepareRebalance(MockClient mockClient, Node node, ConsumerPartitionAssignor consumerPartitionAssignor, List<TopicPartition> list, Node node2) {
        if (node2 == null) {
            mockClient.prepareResponseFrom(FindCoordinatorResponse.prepareResponse(Errors.NONE, "mock-group", node), node);
            node2 = new Node(Integer.MAX_VALUE - node.id(), node.host(), node.port());
        }
        mockClient.prepareResponseFrom(joinGroupFollowerResponse(consumerPartitionAssignor, 1, "memberId", "leaderId", Errors.NONE), node2);
        mockClient.prepareResponseFrom(syncGroupResponse(list, Errors.NONE), node2);
        return node2;
    }

    private AtomicBoolean prepareHeartbeatResponse(MockClient mockClient, Node node, Errors errors) {
        AtomicBoolean atomicBoolean = new AtomicBoolean(false);
        mockClient.prepareResponseFrom(abstractRequest -> {
            atomicBoolean.set(true);
            return true;
        }, (AbstractResponse) new HeartbeatResponse(new HeartbeatResponseData().setErrorCode(errors.code())), node);
        return atomicBoolean;
    }

    private AtomicBoolean prepareOffsetCommitResponse(MockClient mockClient, Node node, Map<TopicPartition, Long> map) {
        AtomicBoolean atomicBoolean = new AtomicBoolean(true);
        HashMap hashMap = new HashMap();
        Iterator<TopicPartition> it = map.keySet().iterator();
        while (it.hasNext()) {
            hashMap.put(it.next(), Errors.NONE);
        }
        mockClient.prepareResponseFrom(abstractRequest -> {
            Map offsets = ((OffsetCommitRequest) abstractRequest).offsets();
            for (Map.Entry entry : map.entrySet()) {
                if (!((Long) offsets.get(entry.getKey())).equals(entry.getValue())) {
                    atomicBoolean.set(false);
                    return false;
                }
            }
            return true;
        }, (AbstractResponse) offsetCommitResponse(hashMap), node);
        return atomicBoolean;
    }

    private AtomicBoolean prepareOffsetCommitResponse(MockClient mockClient, Node node, TopicPartition topicPartition, long j) {
        return prepareOffsetCommitResponse(mockClient, node, Collections.singletonMap(topicPartition, Long.valueOf(j)));
    }

    private OffsetCommitResponse offsetCommitResponse(Map<TopicPartition, Errors> map) {
        return new OffsetCommitResponse(map);
    }

    private JoinGroupResponse joinGroupFollowerResponse(ConsumerPartitionAssignor consumerPartitionAssignor, int i, String str, String str2, Errors errors) {
        return new JoinGroupResponse(new JoinGroupResponseData().setErrorCode(errors.code()).setGenerationId(i).setProtocolName(consumerPartitionAssignor.name()).setLeader(str2).setMemberId(str).setMembers(Collections.emptyList()), ApiKeys.JOIN_GROUP.latestVersion());
    }

    private SyncGroupResponse syncGroupResponse(List<TopicPartition> list, Errors errors) {
        return new SyncGroupResponse(new SyncGroupResponseData().setErrorCode(errors.code()).setAssignment(Utils.toArray(ConsumerProtocol.serializeAssignment(new ConsumerPartitionAssignor.Assignment(list)))));
    }

    private OffsetFetchResponse offsetResponse(Map<TopicPartition, Long> map, Errors errors) {
        HashMap hashMap = new HashMap();
        for (Map.Entry<TopicPartition, Long> entry : map.entrySet()) {
            hashMap.put(entry.getKey(), new OffsetFetchResponse.PartitionData(entry.getValue().longValue(), Optional.empty(), "", errors));
        }
        return new OffsetFetchResponse(10, Collections.singletonMap("mock-group", Errors.NONE), Collections.singletonMap("mock-group", hashMap));
    }

    private ListOffsetsResponse listOffsetsResponse(Map<TopicPartition, Long> map) {
        return listOffsetsResponse(map, Collections.emptyMap());
    }

    private ListOffsetsResponse listOffsetsResponse(Map<TopicPartition, Long> map, Map<TopicPartition, Errors> map2) {
        HashMap hashMap = new HashMap();
        for (Map.Entry<TopicPartition, Long> entry : map.entrySet()) {
            TopicPartition key = entry.getKey();
            ((ListOffsetsResponseData.ListOffsetsTopicResponse) hashMap.computeIfAbsent(key.topic(), str -> {
                return new ListOffsetsResponseData.ListOffsetsTopicResponse().setName(key.topic());
            })).partitions().add(new ListOffsetsResponseData.ListOffsetsPartitionResponse().setPartitionIndex(key.partition()).setErrorCode(Errors.NONE.code()).setTimestamp(-1L).setOffset(entry.getValue().longValue()));
        }
        for (Map.Entry<TopicPartition, Errors> entry2 : map2.entrySet()) {
            TopicPartition key2 = entry2.getKey();
            ((ListOffsetsResponseData.ListOffsetsTopicResponse) hashMap.computeIfAbsent(key2.topic(), str2 -> {
                return new ListOffsetsResponseData.ListOffsetsTopicResponse().setName(key2.topic());
            })).partitions().add(new ListOffsetsResponseData.ListOffsetsPartitionResponse().setPartitionIndex(key2.partition()).setErrorCode(entry2.getValue().code()).setTimestamp(-1L).setOffset(-1L));
        }
        return new ListOffsetsResponse(new ListOffsetsResponseData().setTopics(new ArrayList(hashMap.values())));
    }

    private FetchResponse fetchResponse(Map<TopicPartition, FetchInfo> map) {
        MemoryRecords build;
        LinkedHashMap linkedHashMap = new LinkedHashMap();
        for (Map.Entry<TopicPartition, FetchInfo> entry : map.entrySet()) {
            TopicPartition key = entry.getKey();
            long j = entry.getValue().offset;
            int i = entry.getValue().count;
            long j2 = entry.getValue().logLastOffset + 1;
            long j3 = entry.getValue().logFirstOffset;
            if (i == 0) {
                build = MemoryRecords.EMPTY;
            } else {
                MemoryRecordsBuilder builder = MemoryRecords.builder(ByteBuffer.allocate(KafkaChannelTest.MAX_RECEIVE_SIZE), CompressionType.NONE, TimestampType.CREATE_TIME, j);
                Throwable th = null;
                for (int i2 = 0; i2 < i; i2++) {
                    try {
                        try {
                            builder.append(0L, ("key-" + i2).getBytes(), ("value-" + i2).getBytes());
                        } catch (Throwable th2) {
                            if (builder != null) {
                                if (th != null) {
                                    try {
                                        builder.close();
                                    } catch (Throwable th3) {
                                        th.addSuppressed(th3);
                                    }
                                } else {
                                    builder.close();
                                }
                            }
                            throw th2;
                        }
                    } finally {
                    }
                }
                build = builder.build();
                if (builder != null) {
                    if (0 != 0) {
                        try {
                            builder.close();
                        } catch (Throwable th4) {
                            th.addSuppressed(th4);
                        }
                    } else {
                        builder.close();
                    }
                }
            }
            linkedHashMap.put(new TopicIdPartition(this.topicIds.get(key.topic()), key), new FetchResponseData.PartitionData().setPartitionIndex(key.partition()).setHighWatermark(j2).setLogStartOffset(j3).setRecords(build));
        }
        return FetchResponse.of(Errors.NONE, 0, 0, linkedHashMap);
    }

    private FetchResponse fetchResponse(TopicPartition topicPartition, long j, int i) {
        return fetchResponse(Collections.singletonMap(topicPartition, new FetchInfo(j, i)));
    }

    private KafkaConsumer<String, String> newConsumer(GroupProtocol groupProtocol, Time time, KafkaClient kafkaClient, SubscriptionState subscriptionState, ConsumerMetadata consumerMetadata, ConsumerPartitionAssignor consumerPartitionAssignor, boolean z, Optional<String> optional) {
        return newConsumer(groupProtocol, time, kafkaClient, subscriptionState, consumerMetadata, consumerPartitionAssignor, z, "mock-group", optional, false);
    }

    private KafkaConsumer<String, String> newConsumerNoAutoCommit(GroupProtocol groupProtocol, Time time, KafkaClient kafkaClient, SubscriptionState subscriptionState, ConsumerMetadata consumerMetadata) {
        return newConsumer(groupProtocol, time, kafkaClient, subscriptionState, consumerMetadata, new RangeAssignor(), false, "mock-group", this.groupInstanceId, false);
    }

    private KafkaConsumer<String, String> newConsumer(GroupProtocol groupProtocol, Time time, KafkaClient kafkaClient, SubscriptionState subscriptionState, ConsumerMetadata consumerMetadata, ConsumerPartitionAssignor consumerPartitionAssignor, boolean z, String str, Optional<String> optional, boolean z2) {
        return newConsumer(groupProtocol, time, kafkaClient, subscriptionState, consumerMetadata, consumerPartitionAssignor, z, str, optional, Optional.of(new StringDeserializer()), z2);
    }

    private KafkaConsumer<String, String> newConsumer(GroupProtocol groupProtocol, Time time, KafkaClient kafkaClient, SubscriptionState subscriptionState, ConsumerMetadata consumerMetadata, ConsumerPartitionAssignor consumerPartitionAssignor, boolean z, String str, Optional<String> optional, Optional<Deserializer<String>> optional2, boolean z2) {
        StringDeserializer stringDeserializer = new StringDeserializer();
        Deserializer<String> orElse = optional2.orElse(new StringDeserializer());
        return new KafkaConsumer<>(new LogContext(), time, newConsumerConfig(groupProtocol, z, str, optional, orElse, z2), stringDeserializer, orElse, kafkaClient, subscriptionState, consumerMetadata, Collections.singletonList(consumerPartitionAssignor));
    }

    private ConsumerConfig newConsumerConfig(GroupProtocol groupProtocol, boolean z, String str, Optional<String> optional, Deserializer<String> deserializer, boolean z2) {
        HashMap hashMap = new HashMap();
        hashMap.put("auto.commit.interval.ms", 500);
        hashMap.put("check.crcs", true);
        hashMap.put("client.id", "mock-consumer");
        hashMap.put("client.rack", "");
        hashMap.put("default.api.timeout.ms", 60000);
        hashMap.put("enable.auto.commit", Boolean.valueOf(z));
        hashMap.put("fetch.max.bytes", Integer.MAX_VALUE);
        hashMap.put("fetch.max.wait.ms", 500);
        hashMap.put("fetch.min.bytes", 1);
        hashMap.put("group.id", str);
        hashMap.put("group.protocol", groupProtocol.name());
        hashMap.put("heartbeat.interval.ms", 1000);
        hashMap.put("isolation.level", IsolationLevel.READ_UNCOMMITTED.name().toLowerCase(Locale.ROOT));
        hashMap.put("key.deserializer", StringDeserializer.class);
        hashMap.put("max.partition.fetch.bytes", 1048576);
        hashMap.put("max.poll.interval.ms", 60000);
        hashMap.put("max.poll.records", Integer.MAX_VALUE);
        hashMap.put("request.timeout.ms", 30000);
        hashMap.put("retry.backoff.max.ms", 1000L);
        hashMap.put("retry.backoff.ms", 100L);
        hashMap.put("session.timeout.ms", 10000);
        hashMap.put("internal.throw.on.fetch.stable.offset.unsupported", Boolean.valueOf(z2));
        hashMap.put("value.deserializer", deserializer.getClass());
        optional.ifPresent(str2 -> {
            hashMap.put("group.instance.id", str2);
        });
        return new ConsumerConfig(hashMap);
    }

    @EnumSource(value = GroupProtocol.class, names = {"CLASSIC"})
    @ParameterizedTest
    public void testSubscriptionOnInvalidTopic(GroupProtocol groupProtocol) {
        ConsumerMetadata createMetadata = createMetadata(this.subscription);
        MockClient mockClient = new MockClient(this.time, (Metadata) createMetadata);
        initMetadata(mockClient, Collections.singletonMap("test", 1));
        Cluster fetch = createMetadata.fetch();
        ArrayList arrayList = new ArrayList();
        arrayList.add(new MetadataResponse.TopicMetadata(Errors.INVALID_TOPIC_EXCEPTION, "topic abc", false, Collections.emptyList()));
        mockClient.prepareMetadataUpdate(RequestTestUtils.metadataResponse(fetch.nodes(), fetch.clusterResource().clusterId(), fetch.controller().id(), arrayList));
        KafkaConsumer<String, String> newConsumer = newConsumer(groupProtocol, this.time, mockClient, this.subscription, createMetadata, this.assignor, true, this.groupInstanceId);
        newConsumer.subscribe(Collections.singleton("topic abc"), getConsumerRebalanceListener(newConsumer));
        Assertions.assertThrows(InvalidTopicException.class, () -> {
            newConsumer.poll(Duration.ZERO);
        });
    }

    @EnumSource(GroupProtocol.class)
    @ParameterizedTest
    public void testPollTimeMetrics(GroupProtocol groupProtocol) {
        ConsumerMetadata createMetadata = createMetadata(this.subscription);
        MockClient mockClient = new MockClient(this.time, (Metadata) createMetadata);
        initMetadata(mockClient, Collections.singletonMap("test", 1));
        KafkaConsumer<String, String> newConsumer = newConsumer(groupProtocol, this.time, mockClient, this.subscription, createMetadata, this.assignor, true, this.groupInstanceId);
        newConsumer.subscribe(Collections.singletonList("test"));
        Metrics metricsRegistry = newConsumer.metricsRegistry();
        MetricName metricName = metricsRegistry.metricName("last-poll-seconds-ago", "consumer-metrics");
        MetricName metricName2 = metricsRegistry.metricName("time-between-poll-avg", "consumer-metrics");
        MetricName metricName3 = metricsRegistry.metricName("time-between-poll-max", "consumer-metrics");
        Assertions.assertEquals(Double.valueOf(-1.0d), ((Metric) newConsumer.metrics().get(metricName)).metricValue());
        Assertions.assertEquals(Double.valueOf(Double.NaN), ((Metric) newConsumer.metrics().get(metricName2)).metricValue());
        Assertions.assertEquals(Double.valueOf(Double.NaN), ((Metric) newConsumer.metrics().get(metricName3)).metricValue());
        newConsumer.poll(Duration.ZERO);
        Assertions.assertEquals(Double.valueOf(0.0d), ((Metric) newConsumer.metrics().get(metricName)).metricValue());
        Assertions.assertEquals(Double.valueOf(0.0d), ((Metric) newConsumer.metrics().get(metricName2)).metricValue());
        Assertions.assertEquals(Double.valueOf(0.0d), ((Metric) newConsumer.metrics().get(metricName3)).metricValue());
        this.time.sleep(5000L);
        Assertions.assertEquals(Double.valueOf(5.0d), ((Metric) newConsumer.metrics().get(metricName)).metricValue());
        newConsumer.poll(Duration.ZERO);
        Assertions.assertEquals(Double.valueOf(2500.0d), ((Metric) newConsumer.metrics().get(metricName2)).metricValue());
        Assertions.assertEquals(Double.valueOf(5000.0d), ((Metric) newConsumer.metrics().get(metricName3)).metricValue());
        this.time.sleep(10000L);
        Assertions.assertEquals(Double.valueOf(10.0d), ((Metric) newConsumer.metrics().get(metricName)).metricValue());
        newConsumer.poll(Duration.ZERO);
        Assertions.assertEquals(Double.valueOf(5000.0d), ((Metric) newConsumer.metrics().get(metricName2)).metricValue());
        Assertions.assertEquals(Double.valueOf(10000.0d), ((Metric) newConsumer.metrics().get(metricName3)).metricValue());
        this.time.sleep(5000L);
        Assertions.assertEquals(Double.valueOf(5.0d), ((Metric) newConsumer.metrics().get(metricName)).metricValue());
        newConsumer.poll(Duration.ZERO);
        Assertions.assertEquals(Double.valueOf(5000.0d), ((Metric) newConsumer.metrics().get(metricName2)).metricValue());
        Assertions.assertEquals(Double.valueOf(10000.0d), ((Metric) newConsumer.metrics().get(metricName3)).metricValue());
    }

    @EnumSource(GroupProtocol.class)
    @ParameterizedTest
    public void testPollIdleRatio(GroupProtocol groupProtocol) {
        ConsumerMetadata createMetadata = createMetadata(this.subscription);
        MockClient mockClient = new MockClient(this.time, (Metadata) createMetadata);
        initMetadata(mockClient, Collections.singletonMap("test", 1));
        KafkaConsumer<String, String> newConsumer = newConsumer(groupProtocol, this.time, mockClient, this.subscription, createMetadata, this.assignor, true, this.groupInstanceId);
        MetricName metricName = newConsumer.metricsRegistry().metricName("poll-idle-ratio-avg", "consumer-metrics");
        Assertions.assertEquals(Double.valueOf(Double.NaN), ((Metric) newConsumer.metrics().get(metricName)).metricValue());
        newConsumer.kafkaConsumerMetrics().recordPollStart(this.time.milliseconds());
        this.time.sleep(50L);
        newConsumer.kafkaConsumerMetrics().recordPollEnd(this.time.milliseconds());
        Assertions.assertEquals(Double.valueOf(1.0d), ((Metric) newConsumer.metrics().get(metricName)).metricValue());
        this.time.sleep(50L);
        newConsumer.kafkaConsumerMetrics().recordPollStart(this.time.milliseconds());
        newConsumer.kafkaConsumerMetrics().recordPollEnd(this.time.milliseconds());
        Assertions.assertEquals(Double.valueOf(0.5d), ((Metric) newConsumer.metrics().get(metricName)).metricValue());
        this.time.sleep(25L);
        newConsumer.kafkaConsumerMetrics().recordPollStart(this.time.milliseconds());
        this.time.sleep(25L);
        newConsumer.kafkaConsumerMetrics().recordPollEnd(this.time.milliseconds());
        Assertions.assertEquals(Double.valueOf(0.5d), ((Metric) newConsumer.metrics().get(metricName)).metricValue());
    }

    private static boolean consumerMetricPresent(KafkaConsumer<String, String> kafkaConsumer, String str) {
        return kafkaConsumer.metricsRegistry().metrics().containsKey(new MetricName(str, "consumer-metrics", "", Collections.emptyMap()));
    }

    @EnumSource(GroupProtocol.class)
    @ParameterizedTest
    public void testClosingConsumerUnregistersConsumerMetrics(GroupProtocol groupProtocol) {
        MockTime mockTime = new MockTime(1L);
        SubscriptionState subscriptionState = new SubscriptionState(new LogContext(), OffsetResetStrategy.EARLIEST);
        ConsumerMetadata createMetadata = createMetadata(subscriptionState);
        MockClient mockClient = new MockClient((Time) mockTime, (Metadata) createMetadata);
        initMetadata(mockClient, Collections.singletonMap("test", 1));
        KafkaConsumer<String, String> newConsumer = newConsumer(groupProtocol, mockTime, mockClient, subscriptionState, createMetadata, new RoundRobinAssignor(), true, this.groupInstanceId);
        newConsumer.subscribe(Collections.singletonList("test"));
        Assertions.assertTrue(consumerMetricPresent(newConsumer, "last-poll-seconds-ago"));
        Assertions.assertTrue(consumerMetricPresent(newConsumer, "time-between-poll-avg"));
        Assertions.assertTrue(consumerMetricPresent(newConsumer, "time-between-poll-max"));
        newConsumer.close();
        Assertions.assertFalse(consumerMetricPresent(newConsumer, "last-poll-seconds-ago"));
        Assertions.assertFalse(consumerMetricPresent(newConsumer, "time-between-poll-avg"));
        Assertions.assertFalse(consumerMetricPresent(newConsumer, "time-between-poll-max"));
    }

    @EnumSource(value = GroupProtocol.class, names = {"CLASSIC"})
    @ParameterizedTest
    public void testEnforceRebalanceWithManualAssignment(GroupProtocol groupProtocol) {
        this.consumer = newConsumer(groupProtocol, null);
        this.consumer.assign(Collections.singleton(new TopicPartition("topic", 0)));
        KafkaConsumer<?, ?> kafkaConsumer = this.consumer;
        kafkaConsumer.getClass();
        Assertions.assertThrows(IllegalStateException.class, kafkaConsumer::enforceRebalance);
    }

    @EnumSource(value = GroupProtocol.class, names = {"CLASSIC"})
    @ParameterizedTest
    public void testEnforceRebalanceTriggersRebalanceOnNextPoll(GroupProtocol groupProtocol) {
        Time mockTime = new MockTime(1L);
        ConsumerMetadata createMetadata = createMetadata(this.subscription);
        MockClient mockClient = new MockClient(mockTime, (Metadata) createMetadata);
        KafkaConsumer<String, String> newConsumer = newConsumer(groupProtocol, mockTime, mockClient, this.subscription, createMetadata, this.assignor, true, this.groupInstanceId);
        MockRebalanceListener mockRebalanceListener = new MockRebalanceListener();
        initMetadata(mockClient, Utils.mkMap(new Map.Entry[]{Utils.mkEntry("test", 1), Utils.mkEntry("test2", 1), Utils.mkEntry("test3", 1)}));
        newConsumer.subscribe(Arrays.asList("test", "test2"), mockRebalanceListener);
        prepareRebalance(mockClient, (Node) createMetadata.fetch().nodes().get(0), this.assignor, Arrays.asList(this.tp0, this.t2p0), null);
        newConsumer.poll(Duration.ZERO);
        newConsumer.poll(Duration.ZERO);
        Assertions.assertEquals(mockRebalanceListener.revokedCount, 0);
        Assertions.assertEquals(mockRebalanceListener.assignedCount, 1);
        newConsumer.enforceRebalance();
        newConsumer.poll(Duration.ZERO);
        Assertions.assertEquals(mockRebalanceListener.revokedCount, 1);
    }

    @EnumSource(value = GroupProtocol.class, names = {"CLASSIC"})
    @ParameterizedTest
    public void testEnforceRebalanceReason(GroupProtocol groupProtocol) {
        MockTime mockTime = new MockTime(1L);
        ConsumerMetadata createMetadata = createMetadata(this.subscription);
        MockClient mockClient = new MockClient((Time) mockTime, (Metadata) createMetadata);
        initMetadata(mockClient, Utils.mkMap(new Map.Entry[]{Utils.mkEntry("test", 1)}));
        Node node = (Node) createMetadata.fetch().nodes().get(0);
        this.consumer = newConsumer(groupProtocol, mockTime, mockClient, this.subscription, createMetadata, this.assignor, true, this.groupInstanceId);
        this.consumer.subscribe(Collections.singletonList("test"));
        mockClient.prepareResponseFrom(FindCoordinatorResponse.prepareResponse(Errors.NONE, "mock-group", node), node);
        this.consumer.poll(Duration.ZERO);
        prepareJoinGroupAndVerifyReason(mockClient, node, "");
        this.consumer.poll(Duration.ZERO);
        this.consumer.enforceRebalance((String) null);
        prepareJoinGroupAndVerifyReason(mockClient, node, "rebalance enforced by user");
        this.consumer.poll(Duration.ZERO);
        this.consumer.enforceRebalance("");
        prepareJoinGroupAndVerifyReason(mockClient, node, "rebalance enforced by user");
        this.consumer.poll(Duration.ZERO);
        this.consumer.enforceRebalance("user provided reason");
        prepareJoinGroupAndVerifyReason(mockClient, node, "user provided reason");
        this.consumer.poll(Duration.ZERO);
    }

    private void prepareJoinGroupAndVerifyReason(MockClient mockClient, Node node, String str) {
        mockClient.prepareResponseFrom(abstractRequest -> {
            return str.equals(((JoinGroupRequest) abstractRequest).data().reason());
        }, (AbstractResponse) joinGroupFollowerResponse(this.assignor, 1, "memberId", "leaderId", Errors.NONE), node);
    }

    @EnumSource(value = GroupProtocol.class, names = {"CLASSIC"})
    @ParameterizedTest
    public void configurableObjectsShouldSeeGeneratedClientId(GroupProtocol groupProtocol) {
        Properties properties = new Properties();
        properties.put("group.protocol", groupProtocol.name());
        properties.put("bootstrap.servers", "localhost:9999");
        properties.put("key.deserializer", DeserializerForClientId.class.getName());
        properties.put("value.deserializer", DeserializerForClientId.class.getName());
        properties.put("interceptor.classes", ConsumerInterceptorForClientId.class.getName());
        this.consumer = newConsumer(properties);
        Assertions.assertNotNull(this.consumer.clientId());
        Assertions.assertNotEquals(0, this.consumer.clientId().length());
        Assertions.assertEquals(3, CLIENT_IDS.size());
        CLIENT_IDS.forEach(str -> {
            Assertions.assertEquals(str, this.consumer.clientId());
        });
    }

    @EnumSource(GroupProtocol.class)
    @ParameterizedTest
    public void testUnusedConfigs(GroupProtocol groupProtocol) {
        HashMap hashMap = new HashMap();
        hashMap.put("group.protocol", groupProtocol.name());
        hashMap.put("bootstrap.servers", "localhost:9999");
        hashMap.put("ssl.protocol", "TLS");
        ConsumerConfig consumerConfig = new ConsumerConfig(ConsumerConfig.appendDeserializerToConfig(hashMap, new StringDeserializer(), new StringDeserializer()));
        Assertions.assertTrue(consumerConfig.unused().contains("ssl.protocol"));
        this.consumer = new KafkaConsumer<>(consumerConfig, (Deserializer) null, (Deserializer) null);
        Assertions.assertTrue(consumerConfig.unused().contains("ssl.protocol"));
    }

    @EnumSource(GroupProtocol.class)
    @ParameterizedTest
    public void testAssignorNameConflict(GroupProtocol groupProtocol) {
        HashMap hashMap = new HashMap();
        hashMap.put("group.protocol", groupProtocol.name());
        hashMap.put("bootstrap.servers", "localhost:9999");
        hashMap.put("partition.assignment.strategy", Arrays.asList(RangeAssignor.class.getName(), ConsumerPartitionAssignorTest.TestConsumerPartitionAssignor.class.getName()));
        Assertions.assertThrows(KafkaException.class, () -> {
            newConsumer((Map<String, Object>) hashMap, (Deserializer) new StringDeserializer(), (Deserializer) new StringDeserializer());
        });
    }

    @EnumSource(value = GroupProtocol.class, names = {"CLASSIC"})
    @ParameterizedTest
    public void testOffsetsForTimesTimeout(GroupProtocol groupProtocol) {
        KafkaConsumer<String, String> consumerForCheckingTimeoutException = consumerForCheckingTimeoutException(groupProtocol);
        Assertions.assertEquals("Failed to get offsets by times in 60000ms", Assertions.assertThrows(org.apache.kafka.common.errors.TimeoutException.class, () -> {
            consumerForCheckingTimeoutException.offsetsForTimes(Collections.singletonMap(this.tp0, 0L));
        }).getMessage());
    }

    @EnumSource(value = GroupProtocol.class, names = {"CLASSIC"})
    @ParameterizedTest
    public void testBeginningOffsetsTimeout(GroupProtocol groupProtocol) {
        KafkaConsumer<String, String> consumerForCheckingTimeoutException = consumerForCheckingTimeoutException(groupProtocol);
        Assertions.assertEquals("Failed to get offsets by times in 60000ms", Assertions.assertThrows(org.apache.kafka.common.errors.TimeoutException.class, () -> {
            consumerForCheckingTimeoutException.beginningOffsets(Collections.singletonList(this.tp0));
        }).getMessage());
    }

    @EnumSource(value = GroupProtocol.class, names = {"CLASSIC"})
    @ParameterizedTest
    public void testEndOffsetsTimeout(GroupProtocol groupProtocol) {
        KafkaConsumer<String, String> consumerForCheckingTimeoutException = consumerForCheckingTimeoutException(groupProtocol);
        Assertions.assertEquals("Failed to get offsets by times in 60000ms", Assertions.assertThrows(org.apache.kafka.common.errors.TimeoutException.class, () -> {
            consumerForCheckingTimeoutException.endOffsets(Collections.singletonList(this.tp0));
        }).getMessage());
    }

    @EnumSource(GroupProtocol.class)
    @ParameterizedTest
    public void testClientInstanceId() {
        Properties properties = new Properties();
        properties.setProperty("bootstrap.servers", "localhost:9999");
        ClientTelemetryReporter clientTelemetryReporter = (ClientTelemetryReporter) Mockito.mock(ClientTelemetryReporter.class);
        clientTelemetryReporter.configure((Map) ArgumentMatchers.any());
        MockedStatic mockStatic = Mockito.mockStatic(CommonClientConfigs.class, new CallsRealMethods());
        mockStatic.when(() -> {
            CommonClientConfigs.telemetryReporter(ArgumentMatchers.anyString(), (AbstractConfig) ArgumentMatchers.any());
        }).thenReturn(Optional.of(clientTelemetryReporter));
        ClientTelemetrySender clientTelemetrySender = (ClientTelemetrySender) Mockito.mock(ClientTelemetrySender.class);
        Uuid randomUuid = Uuid.randomUuid();
        Mockito.when(clientTelemetryReporter.telemetrySender()).thenReturn(clientTelemetrySender);
        Mockito.when(clientTelemetrySender.clientInstanceId((Duration) ArgumentMatchers.any())).thenReturn(Optional.of(randomUuid));
        this.consumer = newConsumer(properties, (Deserializer) new StringDeserializer(), (Deserializer) new StringDeserializer());
        Assertions.assertEquals(randomUuid, this.consumer.clientInstanceId(Duration.ofMillis(0L)));
        mockStatic.close();
    }

    @EnumSource(GroupProtocol.class)
    @ParameterizedTest
    public void testClientInstanceIdInvalidTimeout() {
        Properties properties = new Properties();
        properties.setProperty("bootstrap.servers", "localhost:9999");
        this.consumer = newConsumer(properties, (Deserializer) new StringDeserializer(), (Deserializer) new StringDeserializer());
        Assertions.assertEquals("The timeout cannot be negative.", ((Exception) Assertions.assertThrows(IllegalArgumentException.class, () -> {
            this.consumer.clientInstanceId(Duration.ofMillis(-1L));
        })).getMessage());
    }

    @EnumSource(GroupProtocol.class)
    @ParameterizedTest
    public void testClientInstanceIdNoTelemetryReporterRegistered() {
        Properties properties = new Properties();
        properties.setProperty("bootstrap.servers", "localhost:9999");
        properties.setProperty("enable.metrics.push", "false");
        this.consumer = newConsumer(properties, (Deserializer) new StringDeserializer(), (Deserializer) new StringDeserializer());
        Assertions.assertEquals("Telemetry is not enabled. Set config `enable.metrics.push` to `true`.", ((Exception) Assertions.assertThrows(IllegalStateException.class, () -> {
            this.consumer.clientInstanceId(Duration.ofMillis(0L));
        })).getMessage());
    }

    private KafkaConsumer<String, String> consumerForCheckingTimeoutException(GroupProtocol groupProtocol) {
        ConsumerMetadata createMetadata = createMetadata(this.subscription);
        MockClient mockClient = new MockClient(this.time, (Metadata) createMetadata);
        initMetadata(mockClient, Collections.singletonMap("test", 1));
        KafkaConsumer<String, String> newConsumer = newConsumer(groupProtocol, this.time, mockClient, this.subscription, createMetadata, new RangeAssignor(), false, this.groupInstanceId);
        for (int i = 0; i < 10; i++) {
            mockClient.prepareResponse(abstractRequest -> {
                this.time.sleep(6000L);
                return abstractRequest instanceof ListOffsetsRequest;
            }, (AbstractResponse) listOffsetsResponse(Collections.emptyMap(), Collections.singletonMap(this.tp0, Errors.UNKNOWN_TOPIC_OR_PARTITION)));
        }
        return newConsumer;
    }

    @EnumSource(GroupProtocol.class)
    @ParameterizedTest
    public void testCommittedThrowsTimeoutExceptionForNoResponse(GroupProtocol groupProtocol) {
        MockTime mockTime = new MockTime(Duration.ofSeconds(1L).toMillis());
        ConsumerMetadata createMetadata = createMetadata(this.subscription);
        MockClient mockClient = new MockClient((Time) mockTime, (Metadata) createMetadata);
        initMetadata(mockClient, Collections.singletonMap("test", 2));
        Node node = (Node) createMetadata.fetch().nodes().get(0);
        this.consumer = newConsumer(groupProtocol, mockTime, mockClient, this.subscription, createMetadata, this.assignor, true, this.groupInstanceId);
        this.consumer.assign(Collections.singletonList(this.tp0));
        mockClient.prepareResponseFrom(FindCoordinatorResponse.prepareResponse(Errors.NONE, "mock-group", node), node);
        mockClient.prepareResponseFrom((AbstractResponse) offsetResponse(Collections.singletonMap(this.tp0, 0L), Errors.NONE), new Node(Integer.MAX_VALUE - node.id(), node.host(), node.port()), true);
        Assertions.assertEquals("Timeout of 1000ms expired before the last committed offset for partitions [test-0] could be determined. Try tuning default.api.timeout.ms larger to relax the threshold.", Assertions.assertThrows(org.apache.kafka.common.errors.TimeoutException.class, () -> {
            this.consumer.committed(Collections.singleton(this.tp0), Duration.ofMillis(1000L));
        }).getMessage());
    }
}
