package org.apache.kafka.clients.consumer.internals;

import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Optional;
import java.util.Properties;
import java.util.Random;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;
import org.apache.kafka.clients.ClientResponse;
import org.apache.kafka.clients.Metadata;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.internals.ConsumerTestBuilder;
import org.apache.kafka.clients.consumer.internals.HeartbeatRequestManager;
import org.apache.kafka.clients.consumer.internals.NetworkClientDelegate;
import org.apache.kafka.clients.consumer.internals.events.BackgroundEvent;
import org.apache.kafka.clients.consumer.internals.events.BackgroundEventHandler;
import org.apache.kafka.clients.consumer.internals.events.GroupMetadataUpdateEvent;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.errors.AuthenticationException;
import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.errors.UnsupportedVersionException;
import org.apache.kafka.common.message.ConsumerGroupHeartbeatRequestData;
import org.apache.kafka.common.message.ConsumerGroupHeartbeatResponseData;
import org.apache.kafka.common.metrics.KafkaMetric;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.network.KafkaChannelTest;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.requests.ConsumerGroupHeartbeatRequest;
import org.apache.kafka.common.requests.ConsumerGroupHeartbeatResponse;
import org.apache.kafka.common.requests.RequestHeader;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Timer;
import org.apache.kafka.common.utils.annotation.ApiKeyVersionsSource;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;
import org.junit.jupiter.params.provider.ValueSource;
import org.mockito.ArgumentMatchers;
import org.mockito.Mockito;

/* loaded from: input_file:org/apache/kafka/clients/consumer/internals/HeartbeatRequestManagerTest.class */
public class HeartbeatRequestManagerTest {
    private static final String DEFAULT_GROUP_ID = "groupId";
    private static final String CONSUMER_COORDINATOR_METRICS = "consumer-coordinator-metrics";
    private ConsumerTestBuilder testBuilder;
    private Time time;
    private Timer pollTimer;
    private CoordinatorRequestManager coordinatorRequestManager;
    private SubscriptionState subscriptions;
    private Metadata metadata;
    private HeartbeatRequestManager heartbeatRequestManager;
    private MembershipManager membershipManager;
    private HeartbeatRequestManager.HeartbeatRequestState heartbeatRequestState;
    private HeartbeatRequestManager.HeartbeatState heartbeatState;
    private BackgroundEventHandler backgroundEventHandler;
    private BlockingQueue<BackgroundEvent> backgroundEventQueue;
    private Metrics metrics;
    private long retryBackoffMs = 80;
    private int heartbeatIntervalMs = 1000;
    private int maxPollIntervalMs = 10000;
    private long retryBackoffMaxMs = 1000;
    private final String memberId = "member-id";
    private final int memberEpoch = 1;

    /* renamed from: org.apache.kafka.clients.consumer.internals.HeartbeatRequestManagerTest$1, reason: invalid class name */
    /* loaded from: input_file:org/apache/kafka/clients/consumer/internals/HeartbeatRequestManagerTest$1.class */
    static /* synthetic */ class AnonymousClass1 {
        static final /* synthetic */ int[] $SwitchMap$org$apache$kafka$common$protocol$Errors = new int[Errors.values().length];

        static {
            try {
                $SwitchMap$org$apache$kafka$common$protocol$Errors[Errors.NONE.ordinal()] = 1;
            } catch (NoSuchFieldError e) {
            }
            try {
                $SwitchMap$org$apache$kafka$common$protocol$Errors[Errors.COORDINATOR_LOAD_IN_PROGRESS.ordinal()] = 2;
            } catch (NoSuchFieldError e2) {
            }
            try {
                $SwitchMap$org$apache$kafka$common$protocol$Errors[Errors.COORDINATOR_NOT_AVAILABLE.ordinal()] = 3;
            } catch (NoSuchFieldError e3) {
            }
            try {
                $SwitchMap$org$apache$kafka$common$protocol$Errors[Errors.NOT_COORDINATOR.ordinal()] = 4;
            } catch (NoSuchFieldError e4) {
            }
            try {
                $SwitchMap$org$apache$kafka$common$protocol$Errors[Errors.UNKNOWN_MEMBER_ID.ordinal()] = 5;
            } catch (NoSuchFieldError e5) {
            }
            try {
                $SwitchMap$org$apache$kafka$common$protocol$Errors[Errors.FENCED_MEMBER_EPOCH.ordinal()] = 6;
            } catch (NoSuchFieldError e6) {
            }
        }
    }

    @BeforeEach
    public void setUp() {
        setUp(ConsumerTestBuilder.createDefaultGroupInformation());
    }

    private void setUp(Optional<ConsumerTestBuilder.GroupInformation> optional) {
        this.testBuilder = new ConsumerTestBuilder(optional, true, false);
        this.time = this.testBuilder.time;
        this.coordinatorRequestManager = this.testBuilder.coordinatorRequestManager.orElseThrow(IllegalStateException::new);
        this.heartbeatRequestManager = this.testBuilder.heartbeatRequestManager.orElseThrow(IllegalStateException::new);
        this.heartbeatRequestState = this.testBuilder.heartbeatRequestState.orElseThrow(IllegalStateException::new);
        this.heartbeatState = this.testBuilder.heartbeatState.orElseThrow(IllegalStateException::new);
        this.backgroundEventHandler = this.testBuilder.backgroundEventHandler;
        this.backgroundEventQueue = this.testBuilder.backgroundEventQueue;
        this.subscriptions = this.testBuilder.subscriptions;
        this.membershipManager = this.testBuilder.membershipManager.orElseThrow(IllegalStateException::new);
        this.metadata = this.testBuilder.metadata;
        this.metrics = new Metrics(this.time);
        Mockito.when(this.coordinatorRequestManager.coordinator()).thenReturn(Optional.of(new Node(1, "localhost", 9999)));
    }

    private void resetWithZeroHeartbeatInterval(Optional<String> optional) {
        cleanup();
        setUp(Optional.of(new ConsumerTestBuilder.GroupInformation(DEFAULT_GROUP_ID, optional, 0, 0.0d, Optional.of("uniform"))));
    }

    @AfterEach
    public void cleanup() {
        if (this.testBuilder != null) {
            this.testBuilder.close();
        }
    }

    @Test
    public void testHeartbeatOnStartup() {
        Assertions.assertEquals(0, this.heartbeatRequestManager.poll(this.time.milliseconds()).unsentRequests.size());
        resetWithZeroHeartbeatInterval(Optional.empty());
        mockStableMember();
        Assertions.assertEquals(0L, this.heartbeatRequestManager.maximumTimeToWait(this.time.milliseconds()));
        Assertions.assertEquals(1, this.heartbeatRequestManager.poll(this.time.milliseconds()).unsentRequests.size());
        Assertions.assertEquals(0, this.heartbeatRequestManager.poll(this.time.milliseconds()).unsentRequests.size());
    }

    @ApiKeyVersionsSource(apiKey = ApiKeys.CONSUMER_GROUP_HEARTBEAT)
    @ParameterizedTest
    public void testFirstHeartbeatIncludesRequiredInfoToJoinGroupAndGetAssignments(short s) {
        resetWithZeroHeartbeatInterval(Optional.of("group-instance-id"));
        this.subscriptions.subscribe(Collections.singleton("topic1"), Optional.empty());
        this.membershipManager.onSubscriptionUpdated();
        Assertions.assertEquals(0L, this.heartbeatRequestManager.maximumTimeToWait(this.time.milliseconds()));
        NetworkClientDelegate.PollResult poll = this.heartbeatRequestManager.poll(this.time.milliseconds());
        Assertions.assertEquals(1, poll.unsentRequests.size());
        NetworkClientDelegate.UnsentRequest unsentRequest = (NetworkClientDelegate.UnsentRequest) poll.unsentRequests.get(0);
        Assertions.assertTrue(unsentRequest.requestBuilder() instanceof ConsumerGroupHeartbeatRequest.Builder);
        ConsumerGroupHeartbeatRequest build = unsentRequest.requestBuilder().build(s);
        Assertions.assertTrue(build.data().memberId().isEmpty());
        Assertions.assertEquals(0, build.data().memberEpoch());
        Assertions.assertEquals(Collections.singletonList("topic1"), build.data().subscribedTopicNames());
        Assertions.assertEquals(10000, build.data().rebalanceTimeoutMs());
        Assertions.assertEquals(DEFAULT_GROUP_ID, build.data().groupId());
        Assertions.assertEquals("group-instance-id", build.data().instanceId());
    }

    @ValueSource(booleans = {true, false})
    @ParameterizedTest
    public void testSkippingHeartbeat(boolean z) {
        resetWithZeroHeartbeatInterval(Optional.empty());
        Mockito.when(Boolean.valueOf(this.membershipManager.shouldSkipHeartbeat())).thenReturn(Boolean.valueOf(z));
        Mockito.when(Boolean.valueOf(this.heartbeatRequestState.canSendRequest(ArgumentMatchers.anyLong()))).thenReturn(true);
        NetworkClientDelegate.PollResult poll = this.heartbeatRequestManager.poll(this.time.milliseconds());
        if (z) {
            Assertions.assertEquals(0, poll.unsentRequests.size());
            Assertions.assertEquals(Long.MAX_VALUE, poll.timeUntilNextPollMs);
        } else {
            Assertions.assertEquals(1, poll.unsentRequests.size());
            Assertions.assertEquals(0L, poll.timeUntilNextPollMs);
        }
    }

    @Test
    public void testTimerNotDue() {
        mockStableMember();
        this.time.sleep(100L);
        NetworkClientDelegate.PollResult poll = this.heartbeatRequestManager.poll(this.time.milliseconds());
        Assertions.assertEquals(0, poll.unsentRequests.size());
        Assertions.assertEquals(900L, poll.timeUntilNextPollMs);
        Assertions.assertEquals(900L, this.heartbeatRequestManager.maximumTimeToWait(this.time.milliseconds()));
        Mockito.when(Boolean.valueOf(this.subscriptions.hasAutoAssignedPartitions())).thenReturn(true);
        this.membershipManager.transitionToFatal();
        Assertions.assertEquals(Long.MAX_VALUE, this.heartbeatRequestManager.poll(this.time.milliseconds()).timeUntilNextPollMs);
    }

    @Test
    public void testHeartbeatOutsideInterval() {
        Mockito.when(Boolean.valueOf(this.membershipManager.shouldSkipHeartbeat())).thenReturn(false);
        Mockito.when(Boolean.valueOf(this.membershipManager.shouldHeartbeatNow())).thenReturn(true);
        NetworkClientDelegate.PollResult poll = this.heartbeatRequestManager.poll(this.time.milliseconds());
        Assertions.assertEquals(1, poll.unsentRequests.size());
        Assertions.assertEquals(1000L, poll.timeUntilNextPollMs);
        Assertions.assertEquals(1000L, this.heartbeatRequestManager.maximumTimeToWait(this.time.milliseconds()));
        ((MembershipManager) Mockito.verify(this.membershipManager)).onHeartbeatRequestSent();
    }

    @Test
    public void testNetworkTimeout() {
        resetWithZeroHeartbeatInterval(Optional.empty());
        mockStableMember();
        Mockito.when(this.coordinatorRequestManager.coordinator()).thenReturn(Optional.of(new Node(1, "localhost", 9999)));
        NetworkClientDelegate.PollResult poll = this.heartbeatRequestManager.poll(this.time.milliseconds());
        Assertions.assertEquals(1, poll.unsentRequests.size());
        ((NetworkClientDelegate.UnsentRequest) poll.unsentRequests.get(0)).handler().onFailure(this.time.milliseconds(), new TimeoutException("timeout"));
        this.time.sleep(79L);
        Assertions.assertEquals(0, this.heartbeatRequestManager.poll(this.time.milliseconds()).unsentRequests.size());
        this.time.sleep(1L);
        Assertions.assertEquals(1, this.heartbeatRequestManager.poll(this.time.milliseconds()).unsentRequests.size());
    }

    @Test
    public void testFailureOnFatalException() {
        resetWithZeroHeartbeatInterval(Optional.empty());
        mockStableMember();
        Mockito.when(this.coordinatorRequestManager.coordinator()).thenReturn(Optional.of(new Node(1, "localhost", 9999)));
        NetworkClientDelegate.PollResult poll = this.heartbeatRequestManager.poll(this.time.milliseconds());
        Assertions.assertEquals(1, poll.unsentRequests.size());
        ((NetworkClientDelegate.UnsentRequest) poll.unsentRequests.get(0)).handler().onFailure(this.time.milliseconds(), new KafkaException("fatal"));
        ((MembershipManager) Mockito.verify(this.membershipManager)).transitionToFatal();
        ((BackgroundEventHandler) Mockito.verify(this.backgroundEventHandler)).add((BackgroundEvent) ArgumentMatchers.any());
    }

    @Test
    public void testNoCoordinator() {
        Mockito.when(this.coordinatorRequestManager.coordinator()).thenReturn(Optional.empty());
        NetworkClientDelegate.PollResult poll = this.heartbeatRequestManager.poll(this.time.milliseconds());
        Assertions.assertEquals(Long.MAX_VALUE, poll.timeUntilNextPollMs);
        Assertions.assertEquals(1000L, this.heartbeatRequestManager.maximumTimeToWait(this.time.milliseconds()));
        Assertions.assertEquals(0, poll.unsentRequests.size());
    }

    @ApiKeyVersionsSource(apiKey = ApiKeys.CONSUMER_GROUP_HEARTBEAT)
    @ParameterizedTest
    public void testValidateConsumerGroupHeartbeatRequest(short s) {
        resetWithZeroHeartbeatInterval(Optional.of("group-instance-id"));
        mockStableMember();
        List singletonList = Collections.singletonList("topic");
        this.subscriptions.subscribe(new HashSet(singletonList), Optional.empty());
        this.membershipManager.onHeartbeatResponseReceived(new ConsumerGroupHeartbeatResponse(new ConsumerGroupHeartbeatResponseData().setMemberId("member-id").setMemberEpoch(1)).data());
        NetworkClientDelegate.PollResult poll = this.heartbeatRequestManager.poll(this.time.milliseconds());
        Assertions.assertEquals(1, poll.unsentRequests.size());
        NetworkClientDelegate.UnsentRequest unsentRequest = (NetworkClientDelegate.UnsentRequest) poll.unsentRequests.get(0);
        Assertions.assertTrue(unsentRequest.requestBuilder() instanceof ConsumerGroupHeartbeatRequest.Builder);
        ConsumerGroupHeartbeatRequest build = unsentRequest.requestBuilder().build(s);
        Assertions.assertEquals(DEFAULT_GROUP_ID, build.data().groupId());
        Assertions.assertEquals("member-id", build.data().memberId());
        Assertions.assertEquals(1, build.data().memberEpoch());
        Assertions.assertEquals(10000, build.data().rebalanceTimeoutMs());
        Assertions.assertEquals(singletonList, build.data().subscribedTopicNames());
        Assertions.assertEquals("group-instance-id", build.data().instanceId());
        Assertions.assertEquals("uniform", build.data().serverAssignor());
    }

    @Test
    public void testConsumerGroupMetadataFirstUpdate() {
        Assertions.assertEquals(new GroupMetadataUpdateEvent(1, "member-id"), makeFirstGroupMetadataUpdate("member-id", 1));
    }

    @Test
    public void testConsumerGroupMetadataUpdateWithSameUpdate() {
        makeFirstGroupMetadataUpdate("member-id", 1);
        this.time.sleep(2000L);
        NetworkClientDelegate.PollResult poll = this.heartbeatRequestManager.poll(this.time.milliseconds());
        Assertions.assertEquals(1, poll.unsentRequests.size());
        NetworkClientDelegate.UnsentRequest unsentRequest = (NetworkClientDelegate.UnsentRequest) poll.unsentRequests.get(0);
        unsentRequest.handler().onComplete(createHeartbeatResponse(unsentRequest, Errors.NONE));
        Assertions.assertEquals(0, this.backgroundEventQueue.size());
    }

    @Test
    public void testConsumerGroupMetadataUpdateWithMemberIdNullButMemberEpochUpdated() {
        makeFirstGroupMetadataUpdate("member-id", 1);
        this.time.sleep(2000L);
        NetworkClientDelegate.PollResult poll = this.heartbeatRequestManager.poll(this.time.milliseconds());
        Assertions.assertEquals(1, poll.unsentRequests.size());
        NetworkClientDelegate.UnsentRequest unsentRequest = (NetworkClientDelegate.UnsentRequest) poll.unsentRequests.get(0);
        unsentRequest.handler().onComplete(createHeartbeatResponseWithMemberIdNull(unsentRequest, Errors.NONE, 2));
        Assertions.assertEquals(1, this.backgroundEventQueue.size());
        GroupMetadataUpdateEvent groupMetadataUpdateEvent = (BackgroundEvent) this.backgroundEventQueue.poll();
        Assertions.assertEquals(BackgroundEvent.Type.GROUP_METADATA_UPDATE, groupMetadataUpdateEvent.type());
        Assertions.assertEquals(new GroupMetadataUpdateEvent(2, "member-id"), groupMetadataUpdateEvent);
    }

    @Test
    public void testConsumerGroupMetadataUpdateWithMemberIdUpdatedAndMemberEpochSame() {
        makeFirstGroupMetadataUpdate("member-id", 1);
        this.time.sleep(2000L);
        NetworkClientDelegate.PollResult poll = this.heartbeatRequestManager.poll(this.time.milliseconds());
        Assertions.assertEquals(1, poll.unsentRequests.size());
        NetworkClientDelegate.UnsentRequest unsentRequest = (NetworkClientDelegate.UnsentRequest) poll.unsentRequests.get(0);
        unsentRequest.handler().onComplete(createHeartbeatResponse(unsentRequest, Errors.NONE, "updatedMemberId", 1));
        Assertions.assertEquals(1, this.backgroundEventQueue.size());
        GroupMetadataUpdateEvent groupMetadataUpdateEvent = (BackgroundEvent) this.backgroundEventQueue.poll();
        Assertions.assertEquals(BackgroundEvent.Type.GROUP_METADATA_UPDATE, groupMetadataUpdateEvent.type());
        Assertions.assertEquals(new GroupMetadataUpdateEvent(1, "updatedMemberId"), groupMetadataUpdateEvent);
    }

    private GroupMetadataUpdateEvent makeFirstGroupMetadataUpdate(String str, int i) {
        resetWithZeroHeartbeatInterval(Optional.empty());
        mockStableMember();
        Mockito.when(this.coordinatorRequestManager.coordinator()).thenReturn(Optional.of(new Node(1, "localhost", 9999)));
        NetworkClientDelegate.PollResult poll = this.heartbeatRequestManager.poll(this.time.milliseconds());
        Assertions.assertEquals(1, poll.unsentRequests.size());
        NetworkClientDelegate.UnsentRequest unsentRequest = (NetworkClientDelegate.UnsentRequest) poll.unsentRequests.get(0);
        unsentRequest.handler().onComplete(createHeartbeatResponse(unsentRequest, Errors.NONE, str, i));
        Assertions.assertEquals(1, this.backgroundEventQueue.size());
        GroupMetadataUpdateEvent groupMetadataUpdateEvent = (BackgroundEvent) this.backgroundEventQueue.poll();
        Assertions.assertEquals(BackgroundEvent.Type.GROUP_METADATA_UPDATE, groupMetadataUpdateEvent.type());
        return groupMetadataUpdateEvent;
    }

    @MethodSource({"errorProvider"})
    @ParameterizedTest
    public void testHeartbeatResponseOnErrorHandling(Errors errors, boolean z) {
        mockStableMember();
        this.time.sleep(1000L);
        NetworkClientDelegate.PollResult poll = this.heartbeatRequestManager.poll(this.time.milliseconds());
        Assertions.assertEquals(1, poll.unsentRequests.size());
        Mockito.when(Boolean.valueOf(this.subscriptions.hasAutoAssignedPartitions())).thenReturn(true);
        ClientResponse createHeartbeatResponse = createHeartbeatResponse((NetworkClientDelegate.UnsentRequest) poll.unsentRequests.get(0), errors);
        ((NetworkClientDelegate.UnsentRequest) poll.unsentRequests.get(0)).handler().onComplete(createHeartbeatResponse);
        ConsumerGroupHeartbeatResponse responseBody = createHeartbeatResponse.responseBody();
        switch (AnonymousClass1.$SwitchMap$org$apache$kafka$common$protocol$Errors[errors.ordinal()]) {
            case 1:
                ((BackgroundEventHandler) Mockito.verify(this.backgroundEventHandler)).add((BackgroundEvent) ArgumentMatchers.any(GroupMetadataUpdateEvent.class));
                ((MembershipManager) Mockito.verify(this.membershipManager, Mockito.times(2))).onHeartbeatResponseReceived(responseBody.data());
                Assertions.assertEquals(1000L, this.heartbeatRequestState.nextHeartbeatMs(this.time.milliseconds()));
                break;
            case 2:
                ((BackgroundEventHandler) Mockito.verify(this.backgroundEventHandler, Mockito.never())).add((BackgroundEvent) ArgumentMatchers.any());
                Assertions.assertEquals(80L, this.heartbeatRequestState.nextHeartbeatMs(this.time.milliseconds()), "Request should backoff after receiving a coordinator load in progress error. ");
                break;
            case 3:
            case 4:
                ((BackgroundEventHandler) Mockito.verify(this.backgroundEventHandler, Mockito.never())).add((BackgroundEvent) ArgumentMatchers.any());
                ((CoordinatorRequestManager) Mockito.verify(this.coordinatorRequestManager)).markCoordinatorUnknown((String) ArgumentMatchers.any(), ArgumentMatchers.anyLong());
                Assertions.assertEquals(0L, this.heartbeatRequestState.nextHeartbeatMs(this.time.milliseconds()), "Request should not apply backoff so that the next heartbeat is sent as soon as the new coordinator is discovered.");
                break;
            case 5:
            case 6:
                ((BackgroundEventHandler) Mockito.verify(this.backgroundEventHandler, Mockito.never())).add((BackgroundEvent) ArgumentMatchers.any());
                Assertions.assertEquals(0L, this.heartbeatRequestState.nextHeartbeatMs(this.time.milliseconds()), "Request should not apply backoff so that the next heartbeat to rejoin is sent as soon as the fenced member releases its assignment.");
                break;
            default:
                if (!z) {
                    ((BackgroundEventHandler) Mockito.verify(this.backgroundEventHandler, Mockito.never())).add((BackgroundEvent) ArgumentMatchers.any());
                    Assertions.assertEquals(0L, this.heartbeatRequestState.nextHeartbeatMs(this.time.milliseconds()));
                    break;
                } else {
                    ensureFatalError();
                    break;
                }
        }
        if (z) {
            return;
        }
        this.time.sleep(1000L);
        Assertions.assertEquals(1, this.heartbeatRequestManager.poll(this.time.milliseconds()).unsentRequests.size());
    }

    @Test
    public void testHeartbeatState() {
        ConsumerGroupHeartbeatRequestData buildRequestData = this.heartbeatState.buildRequestData();
        Assertions.assertEquals("group-id", buildRequestData.groupId());
        Assertions.assertEquals("", buildRequestData.memberId());
        Assertions.assertEquals(0, buildRequestData.memberEpoch());
        Assertions.assertNull(buildRequestData.instanceId());
        Assertions.assertEquals(10000, buildRequestData.rebalanceTimeoutMs());
        Assertions.assertEquals(Collections.emptyList(), buildRequestData.subscribedTopicNames());
        Assertions.assertEquals("uniform", buildRequestData.serverAssignor());
        Assertions.assertEquals(Collections.emptyList(), buildRequestData.topicPartitions());
        this.membershipManager.onHeartbeatRequestSent();
        Assertions.assertEquals(MemberState.UNSUBSCRIBED, this.membershipManager.state());
        mockStableMember();
        ConsumerGroupHeartbeatRequestData buildRequestData2 = this.heartbeatState.buildRequestData();
        Assertions.assertEquals("group-id", buildRequestData2.groupId());
        Assertions.assertEquals("member-id", buildRequestData2.memberId());
        Assertions.assertEquals(1, buildRequestData2.memberEpoch());
        Assertions.assertNull(buildRequestData2.instanceId());
        Assertions.assertEquals(-1, buildRequestData2.rebalanceTimeoutMs());
        Assertions.assertNull(buildRequestData2.subscribedTopicNames());
        Assertions.assertNull(buildRequestData2.serverAssignor());
        Assertions.assertNull(buildRequestData2.topicPartitions());
        this.membershipManager.onHeartbeatRequestSent();
        Assertions.assertEquals(MemberState.STABLE, this.membershipManager.state());
        this.subscriptions.subscribe(Collections.singleton("topic1"), Optional.empty());
        this.membershipManager.onSubscriptionUpdated();
        this.membershipManager.transitionToFenced();
        ConsumerGroupHeartbeatRequestData buildRequestData3 = this.heartbeatState.buildRequestData();
        Assertions.assertEquals("group-id", buildRequestData3.groupId());
        Assertions.assertEquals("member-id", buildRequestData3.memberId());
        Assertions.assertEquals(0, buildRequestData3.memberEpoch());
        Assertions.assertNull(buildRequestData3.instanceId());
        Assertions.assertEquals(-1, buildRequestData3.rebalanceTimeoutMs());
        Assertions.assertEquals(Collections.singletonList("topic1"), buildRequestData3.subscribedTopicNames());
        Assertions.assertNull(buildRequestData3.serverAssignor());
        Assertions.assertNull(buildRequestData3.topicPartitions());
        this.membershipManager.onHeartbeatRequestSent();
        Assertions.assertEquals(MemberState.JOINING, this.membershipManager.state());
        ConsumerGroupHeartbeatResponseData.TopicPartitions topicPartitions = new ConsumerGroupHeartbeatResponseData.TopicPartitions();
        Uuid randomUuid = Uuid.randomUuid();
        topicPartitions.setTopicId(randomUuid);
        topicPartitions.setPartitions(Collections.singletonList(0));
        ConsumerGroupHeartbeatResponseData.Assignment assignment = new ConsumerGroupHeartbeatResponseData.Assignment();
        assignment.setTopicPartitions(Collections.singletonList(topicPartitions));
        ConsumerGroupHeartbeatResponse consumerGroupHeartbeatResponse = new ConsumerGroupHeartbeatResponse(new ConsumerGroupHeartbeatResponseData().setHeartbeatIntervalMs(1000).setMemberId("member-id").setMemberEpoch(1).setAssignment(assignment));
        Mockito.when(this.metadata.topicNames()).thenReturn(Collections.singletonMap(randomUuid, "topic1"));
        this.membershipManager.onHeartbeatResponseReceived(consumerGroupHeartbeatResponse.data());
        Assertions.assertEquals(MemberState.RECONCILING, this.membershipManager.state());
    }

    @Test
    public void testPollTimerExpiration() {
        this.coordinatorRequestManager = (CoordinatorRequestManager) Mockito.mock(CoordinatorRequestManager.class);
        this.membershipManager = (MembershipManager) Mockito.mock(MembershipManager.class);
        this.heartbeatState = (HeartbeatRequestManager.HeartbeatState) Mockito.mock(HeartbeatRequestManager.HeartbeatState.class);
        this.heartbeatRequestState = (HeartbeatRequestManager.HeartbeatRequestState) Mockito.spy(new HeartbeatRequestManager.HeartbeatRequestState(new LogContext(), this.time, this.heartbeatIntervalMs, this.retryBackoffMs, this.retryBackoffMaxMs, 0.0d));
        this.backgroundEventHandler = (BackgroundEventHandler) Mockito.mock(BackgroundEventHandler.class);
        this.heartbeatRequestManager = createHeartbeatRequestManager(this.coordinatorRequestManager, this.membershipManager, this.heartbeatState, this.heartbeatRequestState, this.backgroundEventHandler);
        Mockito.when(this.coordinatorRequestManager.coordinator()).thenReturn(Optional.of(new Node(1, "localhost", 9999)));
        Mockito.when(Boolean.valueOf(this.membershipManager.shouldSkipHeartbeat())).thenReturn(false);
        Mockito.when(this.membershipManager.state()).thenReturn(MemberState.STABLE);
        this.time.sleep(this.maxPollIntervalMs);
        assertHeartbeat(this.heartbeatRequestManager, this.heartbeatIntervalMs);
        ((HeartbeatRequestManager.HeartbeatState) Mockito.verify(this.heartbeatState)).reset();
        ((HeartbeatRequestManager.HeartbeatRequestState) Mockito.verify(this.heartbeatRequestState)).reset();
        ((MembershipManager) Mockito.verify(this.membershipManager)).transitionToStale();
        assertNoHeartbeat(this.heartbeatRequestManager);
        this.heartbeatRequestManager.resetPollTimer(this.time.milliseconds());
        Assertions.assertTrue(this.pollTimer.notExpired());
        assertHeartbeat(this.heartbeatRequestManager, this.heartbeatIntervalMs);
    }

    @Test
    public void testHeartbeatMetrics() {
        this.coordinatorRequestManager = (CoordinatorRequestManager) Mockito.mock(CoordinatorRequestManager.class);
        this.membershipManager = (MembershipManager) Mockito.mock(MembershipManager.class);
        this.heartbeatState = (HeartbeatRequestManager.HeartbeatState) Mockito.mock(HeartbeatRequestManager.HeartbeatState.class);
        this.time = new MockTime();
        this.metrics = new Metrics(this.time);
        this.heartbeatRequestState = new HeartbeatRequestManager.HeartbeatRequestState(new LogContext(), this.time, 0L, this.retryBackoffMs, this.retryBackoffMaxMs, 0.0d);
        this.backgroundEventHandler = (BackgroundEventHandler) Mockito.mock(BackgroundEventHandler.class);
        this.heartbeatRequestManager = createHeartbeatRequestManager(this.coordinatorRequestManager, this.membershipManager, this.heartbeatState, this.heartbeatRequestState, this.backgroundEventHandler);
        Mockito.when(this.coordinatorRequestManager.coordinator()).thenReturn(Optional.of(new Node(1, "localhost", 9999)));
        Mockito.when(this.membershipManager.state()).thenReturn(MemberState.STABLE);
        Assertions.assertNotNull(getMetric("heartbeat-response-time-max"));
        Assertions.assertNotNull(getMetric("heartbeat-rate"));
        Assertions.assertNotNull(getMetric("heartbeat-total"));
        Assertions.assertNotNull(getMetric("last-heartbeat-seconds-ago"));
        assertHeartbeat(this.heartbeatRequestManager, 0);
        this.time.sleep(this.heartbeatIntervalMs);
        Assertions.assertEquals(Double.valueOf(1.0d), getMetric("heartbeat-total").metricValue());
        Assertions.assertEquals(Double.valueOf(TimeUnit.MILLISECONDS.toSeconds(this.heartbeatIntervalMs)), getMetric("last-heartbeat-seconds-ago").metricValue());
        assertHeartbeat(this.heartbeatRequestManager, this.heartbeatIntervalMs);
        Assertions.assertEquals(0.06d, ((Double) getMetric("heartbeat-rate").metricValue()).doubleValue(), 0.005d);
        Assertions.assertEquals(Double.valueOf(2.0d), getMetric("heartbeat-total").metricValue());
        int nextInt = new Random().nextInt(11);
        this.time.sleep(nextInt * 1000);
        Assertions.assertEquals(Double.valueOf(nextInt), getMetric("last-heartbeat-seconds-ago").metricValue());
    }

    private void assertHeartbeat(HeartbeatRequestManager heartbeatRequestManager, int i) {
        NetworkClientDelegate.PollResult poll = heartbeatRequestManager.poll(this.time.milliseconds());
        Assertions.assertEquals(1, poll.unsentRequests.size());
        Assertions.assertEquals(i, poll.timeUntilNextPollMs);
        ((NetworkClientDelegate.UnsentRequest) poll.unsentRequests.get(0)).handler().onComplete(createHeartbeatResponse((NetworkClientDelegate.UnsentRequest) poll.unsentRequests.get(0), Errors.NONE));
    }

    private void assertNoHeartbeat(HeartbeatRequestManager heartbeatRequestManager) {
        Assertions.assertEquals(0, heartbeatRequestManager.poll(this.time.milliseconds()).unsentRequests.size());
    }

    private void mockStableMember() {
        this.membershipManager.onSubscriptionUpdated();
        this.membershipManager.onHeartbeatResponseReceived(new ConsumerGroupHeartbeatResponse(new ConsumerGroupHeartbeatResponseData().setHeartbeatIntervalMs(1000).setMemberId("member-id").setMemberEpoch(1)).data());
        Assertions.assertEquals(MemberState.STABLE, this.membershipManager.state());
    }

    private void ensureFatalError() {
        ((MembershipManager) Mockito.verify(this.membershipManager)).transitionToFatal();
        ((BackgroundEventHandler) Mockito.verify(this.backgroundEventHandler)).add((BackgroundEvent) ArgumentMatchers.any());
        ensureHeartbeatStopped();
    }

    private void ensureHeartbeatStopped() {
        this.time.sleep(1000L);
        Assertions.assertEquals(MemberState.FATAL, this.membershipManager.state());
        Assertions.assertEquals(0, this.heartbeatRequestManager.poll(this.time.milliseconds()).unsentRequests.size());
    }

    private static Collection<Arguments> errorProvider() {
        return Arrays.asList(Arguments.of(new Object[]{Errors.NONE, false}), Arguments.of(new Object[]{Errors.COORDINATOR_NOT_AVAILABLE, false}), Arguments.of(new Object[]{Errors.COORDINATOR_LOAD_IN_PROGRESS, false}), Arguments.of(new Object[]{Errors.NOT_COORDINATOR, false}), Arguments.of(new Object[]{Errors.GROUP_AUTHORIZATION_FAILED, true}), Arguments.of(new Object[]{Errors.INVALID_REQUEST, true}), Arguments.of(new Object[]{Errors.UNKNOWN_MEMBER_ID, false}), Arguments.of(new Object[]{Errors.FENCED_MEMBER_EPOCH, false}), Arguments.of(new Object[]{Errors.UNSUPPORTED_ASSIGNOR, true}), Arguments.of(new Object[]{Errors.UNSUPPORTED_VERSION, true}), Arguments.of(new Object[]{Errors.UNRELEASED_INSTANCE_ID, true}), Arguments.of(new Object[]{Errors.GROUP_MAX_SIZE_REACHED, true}));
    }

    private ClientResponse createHeartbeatResponse(NetworkClientDelegate.UnsentRequest unsentRequest, Errors errors) {
        return createHeartbeatResponse(unsentRequest, errors, "member-id", 1);
    }

    private ClientResponse createHeartbeatResponseWithMemberIdNull(NetworkClientDelegate.UnsentRequest unsentRequest, Errors errors, int i) {
        return createHeartbeatResponse(unsentRequest, errors, null, i);
    }

    private ClientResponse createHeartbeatResponse(NetworkClientDelegate.UnsentRequest unsentRequest, Errors errors, String str, int i) {
        ConsumerGroupHeartbeatResponseData memberEpoch = new ConsumerGroupHeartbeatResponseData().setErrorCode(errors.code()).setHeartbeatIntervalMs(1000).setMemberId(str).setMemberEpoch(i);
        if (errors != Errors.NONE) {
            memberEpoch.setErrorMessage("stubbed error message");
        }
        return new ClientResponse(new RequestHeader(ApiKeys.CONSUMER_GROUP_HEARTBEAT, ApiKeys.CONSUMER_GROUP_HEARTBEAT.latestVersion(), "client-id", 1), unsentRequest.handler(), KafkaChannelTest.CHANNEL_ID, this.time.milliseconds(), this.time.milliseconds(), false, (UnsupportedVersionException) null, (AuthenticationException) null, new ConsumerGroupHeartbeatResponse(memberEpoch));
    }

    private ConsumerConfig config() {
        Properties properties = new Properties();
        properties.put("key.deserializer", StringDeserializer.class);
        properties.put("value.deserializer", StringDeserializer.class);
        properties.setProperty("bootstrap.servers", "localhost:9999");
        properties.setProperty("max.poll.interval.ms", String.valueOf(this.maxPollIntervalMs));
        properties.setProperty("retry.backoff.ms", String.valueOf(this.retryBackoffMs));
        properties.setProperty("retry.backoff.max.ms", String.valueOf(this.retryBackoffMaxMs));
        properties.setProperty("heartbeat.interval.ms", String.valueOf(this.heartbeatIntervalMs));
        return new ConsumerConfig(properties);
    }

    private KafkaMetric getMetric(String str) {
        return (KafkaMetric) this.metrics.metrics().get(this.metrics.metricName(str, CONSUMER_COORDINATOR_METRICS));
    }

    private HeartbeatRequestManager createHeartbeatRequestManager(CoordinatorRequestManager coordinatorRequestManager, MembershipManager membershipManager, HeartbeatRequestManager.HeartbeatState heartbeatState, HeartbeatRequestManager.HeartbeatRequestState heartbeatRequestState, BackgroundEventHandler backgroundEventHandler) {
        LogContext logContext = new LogContext();
        this.pollTimer = this.time.timer(this.maxPollIntervalMs);
        return new HeartbeatRequestManager(logContext, this.pollTimer, config(), coordinatorRequestManager, membershipManager, heartbeatState, heartbeatRequestState, backgroundEventHandler, this.metrics);
    }
}
