package org.apache.kafka.clients.consumer.internals;

import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Map;
import java.util.Optional;
import java.util.Properties;
import java.util.Set;
import org.apache.kafka.clients.ClientResponse;
import org.apache.kafka.clients.Metadata;
import org.apache.kafka.clients.admin.NewTopicTest;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.internals.HeartbeatRequestManager;
import org.apache.kafka.clients.consumer.internals.MembershipManager;
import org.apache.kafka.clients.consumer.internals.NetworkClientDelegate;
import org.apache.kafka.clients.consumer.internals.events.BackgroundEvent;
import org.apache.kafka.clients.consumer.internals.events.BackgroundEventHandler;
import org.apache.kafka.clients.consumer.internals.events.ErrorEvent;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.errors.AuthenticationException;
import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.errors.UnsupportedVersionException;
import org.apache.kafka.common.message.ConsumerGroupHeartbeatRequestData;
import org.apache.kafka.common.message.ConsumerGroupHeartbeatResponseData;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.requests.ConsumerGroupHeartbeatRequest;
import org.apache.kafka.common.requests.ConsumerGroupHeartbeatResponse;
import org.apache.kafka.common.requests.RequestHeader;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Timer;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.common.utils.annotation.ApiKeyVersionsSource;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;
import org.junit.jupiter.params.provider.ValueSource;
import org.mockito.ArgumentCaptor;
import org.mockito.ArgumentMatchers;
import org.mockito.Mockito;

/* loaded from: input_file:org/apache/kafka/clients/consumer/internals/HeartbeatRequestManagerTest.class */
public class HeartbeatRequestManagerTest {
    private static final String DEFAULT_GROUP_ID = "groupId";
    private static final String DEFAULT_REMOTE_ASSIGNOR = "uniform";
    private static final String DEFAULT_GROUP_INSTANCE_ID = "group-instance-id";
    private static final int DEFAULT_HEARTBEAT_INTERVAL_MS = 1000;
    private static final int DEFAULT_MAX_POLL_INTERVAL_MS = 10000;
    private static final long DEFAULT_RETRY_BACKOFF_MS = 80;
    private static final long DEFAULT_RETRY_BACKOFF_MAX_MS = 1000;
    private static final double DEFAULT_HEARTBEAT_JITTER_MS = 0.0d;
    private static final String DEFAULT_MEMBER_ID = "member-id";
    private static final int DEFAULT_MEMBER_EPOCH = 1;
    private Time time;
    private Timer pollTimer;
    private CoordinatorRequestManager coordinatorRequestManager;
    private SubscriptionState subscriptions;
    private Metadata metadata;
    private HeartbeatRequestManager heartbeatRequestManager;
    private MembershipManager membershipManager;
    private HeartbeatRequestManager.HeartbeatRequestState heartbeatRequestState;
    private HeartbeatRequestManager.HeartbeatState heartbeatState;
    private BackgroundEventHandler backgroundEventHandler;
    private LogContext logContext;

    /* renamed from: org.apache.kafka.clients.consumer.internals.HeartbeatRequestManagerTest$1, reason: invalid class name */
    /* loaded from: input_file:org/apache/kafka/clients/consumer/internals/HeartbeatRequestManagerTest$1.class */
    static /* synthetic */ class AnonymousClass1 {
        static final /* synthetic */ int[] $SwitchMap$org$apache$kafka$common$protocol$Errors = new int[Errors.values().length];

        static {
            try {
                $SwitchMap$org$apache$kafka$common$protocol$Errors[Errors.NONE.ordinal()] = 1;
            } catch (NoSuchFieldError e) {
            }
            try {
                $SwitchMap$org$apache$kafka$common$protocol$Errors[Errors.COORDINATOR_LOAD_IN_PROGRESS.ordinal()] = 2;
            } catch (NoSuchFieldError e2) {
            }
            try {
                $SwitchMap$org$apache$kafka$common$protocol$Errors[Errors.COORDINATOR_NOT_AVAILABLE.ordinal()] = 3;
            } catch (NoSuchFieldError e3) {
            }
            try {
                $SwitchMap$org$apache$kafka$common$protocol$Errors[Errors.NOT_COORDINATOR.ordinal()] = 4;
            } catch (NoSuchFieldError e4) {
            }
            try {
                $SwitchMap$org$apache$kafka$common$protocol$Errors[Errors.UNKNOWN_MEMBER_ID.ordinal()] = 5;
            } catch (NoSuchFieldError e5) {
            }
            try {
                $SwitchMap$org$apache$kafka$common$protocol$Errors[Errors.FENCED_MEMBER_EPOCH.ordinal()] = 6;
            } catch (NoSuchFieldError e6) {
            }
        }
    }

    @BeforeEach
    public void setUp() {
        this.time = new MockTime();
        this.logContext = new LogContext();
        this.pollTimer = (Timer) Mockito.spy(this.time.timer(10000L));
        this.coordinatorRequestManager = (CoordinatorRequestManager) Mockito.mock(CoordinatorRequestManager.class);
        this.heartbeatState = (HeartbeatRequestManager.HeartbeatState) Mockito.mock(HeartbeatRequestManager.HeartbeatState.class);
        this.backgroundEventHandler = (BackgroundEventHandler) Mockito.mock(BackgroundEventHandler.class);
        this.subscriptions = (SubscriptionState) Mockito.mock(SubscriptionState.class);
        this.membershipManager = (MembershipManager) Mockito.mock(MembershipManagerImpl.class);
        this.metadata = (Metadata) Mockito.mock(ConsumerMetadata.class);
        Metrics metrics = new Metrics(this.time);
        ConsumerConfig consumerConfig = (ConsumerConfig) Mockito.mock(ConsumerConfig.class);
        this.heartbeatRequestState = (HeartbeatRequestManager.HeartbeatRequestState) Mockito.spy(new HeartbeatRequestManager.HeartbeatRequestState(this.logContext, this.time, DEFAULT_RETRY_BACKOFF_MAX_MS, DEFAULT_RETRY_BACKOFF_MS, DEFAULT_RETRY_BACKOFF_MAX_MS, DEFAULT_HEARTBEAT_JITTER_MS));
        this.heartbeatRequestManager = new HeartbeatRequestManager(this.logContext, this.pollTimer, consumerConfig, this.coordinatorRequestManager, this.membershipManager, this.heartbeatState, this.heartbeatRequestState, this.backgroundEventHandler, metrics);
        Mockito.when(this.coordinatorRequestManager.coordinator()).thenReturn(Optional.of(Mockito.mock(Node.class)));
    }

    private void createHeartbeatRequestStateWithZeroHeartbeatInterval() {
        this.heartbeatRequestState = (HeartbeatRequestManager.HeartbeatRequestState) Mockito.spy(new HeartbeatRequestManager.HeartbeatRequestState(this.logContext, this.time, 0L, DEFAULT_RETRY_BACKOFF_MS, DEFAULT_RETRY_BACKOFF_MAX_MS, DEFAULT_HEARTBEAT_JITTER_MS));
        this.heartbeatRequestManager = createHeartbeatRequestManager(this.coordinatorRequestManager, this.membershipManager, this.heartbeatState, this.heartbeatRequestState, this.backgroundEventHandler);
    }

    private void createHeartbeatStatAndRequestManager() {
        this.heartbeatState = new HeartbeatRequestManager.HeartbeatState(this.subscriptions, this.membershipManager, DEFAULT_MAX_POLL_INTERVAL_MS);
        this.heartbeatRequestManager = createHeartbeatRequestManager(this.coordinatorRequestManager, this.membershipManager, this.heartbeatState, this.heartbeatRequestState, this.backgroundEventHandler);
    }

    @Test
    public void testHeartBeatRequestStateToStringBase() {
        LogContext logContext = new LogContext();
        HeartbeatRequestManager.HeartbeatRequestState heartbeatRequestState = new HeartbeatRequestManager.HeartbeatRequestState(logContext, this.time, DEFAULT_RETRY_BACKOFF_MAX_MS, 100L, DEFAULT_RETRY_BACKOFF_MAX_MS, 0.2d);
        String str = new RequestState(logContext, HeartbeatRequestManager.HeartbeatRequestState.class.getName(), 100L, DEFAULT_RETRY_BACKOFF_MAX_MS).toStringBase() + ", remainingMs=" + DEFAULT_HEARTBEAT_INTERVAL_MS + ", heartbeatIntervalMs=" + DEFAULT_HEARTBEAT_INTERVAL_MS;
        heartbeatRequestState.getClass();
        Assertions.assertDoesNotThrow(heartbeatRequestState::toString);
        Assertions.assertEquals(str, heartbeatRequestState.toStringBase());
    }

    @Test
    public void testHeartbeatOnStartup() {
        Assertions.assertEquals(0, this.heartbeatRequestManager.poll(this.time.milliseconds()).unsentRequests.size());
        createHeartbeatRequestStateWithZeroHeartbeatInterval();
        Assertions.assertEquals(0L, this.heartbeatRequestManager.maximumTimeToWait(this.time.milliseconds()));
        Assertions.assertEquals(1, this.heartbeatRequestManager.poll(this.time.milliseconds()).unsentRequests.size());
        Assertions.assertEquals(0, this.heartbeatRequestManager.poll(this.time.milliseconds()).unsentRequests.size());
    }

    @Test
    public void testSuccessfulHeartbeatTiming() {
        NetworkClientDelegate.PollResult poll = this.heartbeatRequestManager.poll(this.time.milliseconds());
        Assertions.assertEquals(0, poll.unsentRequests.size(), "No heartbeat should be sent while interval has not expired");
        Assertions.assertEquals(this.heartbeatRequestState.timeToNextHeartbeatMs(this.time.milliseconds()), poll.timeUntilNextPollMs);
        assertNextHeartbeatTiming(DEFAULT_RETRY_BACKOFF_MAX_MS);
        NetworkClientDelegate.PollResult poll2 = this.heartbeatRequestManager.poll(this.time.milliseconds());
        Assertions.assertEquals(1, poll2.unsentRequests.size(), "A heartbeat should be sent when interval expires");
        NetworkClientDelegate.UnsentRequest unsentRequest = (NetworkClientDelegate.UnsentRequest) poll2.unsentRequests.get(0);
        Assertions.assertEquals(DEFAULT_RETRY_BACKOFF_MAX_MS, this.heartbeatRequestState.timeToNextHeartbeatMs(this.time.milliseconds()), "Heartbeat timer was not reset to the interval when the heartbeat request was sent.");
        this.time.sleep(333L);
        Assertions.assertEquals(0, this.heartbeatRequestManager.poll(this.time.milliseconds()).unsentRequests.size(), "No heartbeat should be sent while only part of the interval has passed");
        Assertions.assertEquals(DEFAULT_RETRY_BACKOFF_MAX_MS - 333, this.heartbeatRequestState.timeToNextHeartbeatMs(this.time.milliseconds()), "Time to next interval was not properly updated.");
        unsentRequest.handler().onComplete(createHeartbeatResponse(unsentRequest, Errors.NONE));
        assertNextHeartbeatTiming(DEFAULT_RETRY_BACKOFF_MAX_MS - 333);
    }

    @ApiKeyVersionsSource(apiKey = ApiKeys.CONSUMER_GROUP_HEARTBEAT)
    @ParameterizedTest
    public void testFirstHeartbeatIncludesRequiredInfoToJoinGroupAndGetAssignments(short s) {
        createHeartbeatStatAndRequestManager();
        createHeartbeatRequestStateWithZeroHeartbeatInterval();
        this.time.sleep(DEFAULT_RETRY_BACKOFF_MAX_MS);
        Set singleton = Collections.singleton("topic1");
        Mockito.when(this.subscriptions.subscription()).thenReturn(singleton);
        this.subscriptions.subscribe(singleton, Optional.empty());
        mockJoiningMemberData(DEFAULT_GROUP_INSTANCE_ID);
        Assertions.assertEquals(0L, this.heartbeatRequestManager.maximumTimeToWait(this.time.milliseconds()));
        NetworkClientDelegate.PollResult poll = this.heartbeatRequestManager.poll(this.time.milliseconds());
        Assertions.assertEquals(1, poll.unsentRequests.size());
        NetworkClientDelegate.UnsentRequest unsentRequest = (NetworkClientDelegate.UnsentRequest) poll.unsentRequests.get(0);
        Assertions.assertInstanceOf(ConsumerGroupHeartbeatRequest.Builder.class, unsentRequest.requestBuilder());
        ConsumerGroupHeartbeatRequest build = unsentRequest.requestBuilder().build(s);
        Assertions.assertTrue(build.data().memberId().isEmpty());
        Assertions.assertEquals(0, build.data().memberEpoch());
        Assertions.assertEquals(Collections.singletonList("topic1"), build.data().subscribedTopicNames());
        Assertions.assertEquals(DEFAULT_MAX_POLL_INTERVAL_MS, build.data().rebalanceTimeoutMs());
        Assertions.assertEquals(DEFAULT_GROUP_ID, build.data().groupId());
        Assertions.assertEquals(DEFAULT_GROUP_INSTANCE_ID, build.data().instanceId());
    }

    @ValueSource(booleans = {true, false})
    @ParameterizedTest
    public void testSkippingHeartbeat(boolean z) {
        createHeartbeatRequestStateWithZeroHeartbeatInterval();
        Mockito.when(Boolean.valueOf(this.membershipManager.shouldSkipHeartbeat())).thenReturn(Boolean.valueOf(z));
        NetworkClientDelegate.PollResult poll = this.heartbeatRequestManager.poll(this.time.milliseconds());
        if (z) {
            Assertions.assertEquals(0, poll.unsentRequests.size());
            Assertions.assertEquals(Long.MAX_VALUE, poll.timeUntilNextPollMs);
        } else {
            Assertions.assertEquals(1, poll.unsentRequests.size());
            Assertions.assertEquals(0L, poll.timeUntilNextPollMs);
        }
    }

    @Test
    public void testTimerNotDue() {
        this.time.sleep(100L);
        NetworkClientDelegate.PollResult poll = this.heartbeatRequestManager.poll(this.time.milliseconds());
        Assertions.assertEquals(0, poll.unsentRequests.size());
        Assertions.assertEquals(900L, poll.timeUntilNextPollMs);
        Assertions.assertEquals(900L, this.heartbeatRequestManager.maximumTimeToWait(this.time.milliseconds()));
        Mockito.when(Boolean.valueOf(this.subscriptions.hasAutoAssignedPartitions())).thenReturn(true);
        Mockito.when(Boolean.valueOf(this.membershipManager.shouldSkipHeartbeat())).thenReturn(true);
        Assertions.assertEquals(Long.MAX_VALUE, this.heartbeatRequestManager.poll(this.time.milliseconds()).timeUntilNextPollMs);
    }

    @Test
    public void testHeartbeatNotSentIfAnotherOneInFlight() {
        this.time.sleep(DEFAULT_RETRY_BACKOFF_MAX_MS);
        NetworkClientDelegate.PollResult poll = this.heartbeatRequestManager.poll(this.time.milliseconds());
        Assertions.assertEquals(1, poll.unsentRequests.size());
        NetworkClientDelegate.UnsentRequest unsentRequest = (NetworkClientDelegate.UnsentRequest) poll.unsentRequests.get(0);
        this.time.sleep(DEFAULT_RETRY_BACKOFF_MAX_MS);
        Assertions.assertEquals(0, this.heartbeatRequestManager.poll(this.time.milliseconds()).unsentRequests.size(), "No heartbeat should be sent while a previous one is in-flight");
        this.time.sleep(DEFAULT_RETRY_BACKOFF_MAX_MS);
        Assertions.assertEquals(0, this.heartbeatRequestManager.poll(this.time.milliseconds()).unsentRequests.size(), "No heartbeat should be sent when the interval expires if there is a previous HB request in-flight");
        unsentRequest.handler().onComplete(createHeartbeatResponse(unsentRequest, Errors.NONE));
        this.time.sleep(DEFAULT_RETRY_BACKOFF_MS);
        Assertions.assertEquals(1, this.heartbeatRequestManager.poll(this.time.milliseconds()).unsentRequests.size(), "A next heartbeat should be sent on the first poll after receiving a response that took longer than the interval, waiting only for the minimal backoff.");
    }

    @Test
    public void testHeartbeatOutsideInterval() {
        Mockito.when(Boolean.valueOf(this.membershipManager.shouldSkipHeartbeat())).thenReturn(false);
        Mockito.when(Boolean.valueOf(this.membershipManager.shouldHeartbeatNow())).thenReturn(true);
        NetworkClientDelegate.PollResult poll = this.heartbeatRequestManager.poll(this.time.milliseconds());
        Assertions.assertEquals(1, poll.unsentRequests.size());
        Assertions.assertEquals(DEFAULT_RETRY_BACKOFF_MAX_MS, poll.timeUntilNextPollMs);
        Assertions.assertEquals(DEFAULT_RETRY_BACKOFF_MAX_MS, this.heartbeatRequestManager.maximumTimeToWait(this.time.milliseconds()));
        ((MembershipManager) Mockito.verify(this.membershipManager)).onHeartbeatRequestGenerated();
    }

    @Test
    public void testNetworkTimeout() {
        createHeartbeatRequestStateWithZeroHeartbeatInterval();
        NetworkClientDelegate.PollResult poll = this.heartbeatRequestManager.poll(this.time.milliseconds());
        Assertions.assertEquals(1, poll.unsentRequests.size());
        ((NetworkClientDelegate.UnsentRequest) poll.unsentRequests.get(0)).handler().onFailure(this.time.milliseconds(), new TimeoutException("timeout"));
        ((MembershipManager) Mockito.verify(this.membershipManager)).onHeartbeatFailure(true);
        ((BackgroundEventHandler) Mockito.verify(this.backgroundEventHandler, Mockito.never())).add((BackgroundEvent) ArgumentMatchers.any());
        this.time.sleep(79L);
        Assertions.assertEquals(0, this.heartbeatRequestManager.poll(this.time.milliseconds()).unsentRequests.size());
        this.time.sleep(1L);
        Assertions.assertEquals(1, this.heartbeatRequestManager.poll(this.time.milliseconds()).unsentRequests.size());
    }

    @Test
    public void testFailureOnFatalException() {
        this.time.sleep(DEFAULT_RETRY_BACKOFF_MAX_MS);
        NetworkClientDelegate.PollResult poll = this.heartbeatRequestManager.poll(this.time.milliseconds());
        Assertions.assertEquals(1, poll.unsentRequests.size());
        ((NetworkClientDelegate.UnsentRequest) poll.unsentRequests.get(0)).handler().onFailure(this.time.milliseconds(), new KafkaException("fatal"));
        ((MembershipManager) Mockito.verify(this.membershipManager)).onHeartbeatFailure(false);
        ((MembershipManager) Mockito.verify(this.membershipManager)).transitionToFatal();
        ((BackgroundEventHandler) Mockito.verify(this.backgroundEventHandler)).add((BackgroundEvent) ArgumentMatchers.any());
    }

    @Test
    public void testNoCoordinator() {
        Mockito.when(this.coordinatorRequestManager.coordinator()).thenReturn(Optional.empty());
        NetworkClientDelegate.PollResult poll = this.heartbeatRequestManager.poll(this.time.milliseconds());
        Assertions.assertEquals(Long.MAX_VALUE, poll.timeUntilNextPollMs);
        Assertions.assertEquals(DEFAULT_RETRY_BACKOFF_MAX_MS, this.heartbeatRequestManager.maximumTimeToWait(this.time.milliseconds()));
        Assertions.assertEquals(0, poll.unsentRequests.size());
    }

    @ApiKeyVersionsSource(apiKey = ApiKeys.CONSUMER_GROUP_HEARTBEAT)
    @ParameterizedTest
    public void testValidateConsumerGroupHeartbeatRequest(short s) {
        createHeartbeatStatAndRequestManager();
        this.time.sleep(DEFAULT_RETRY_BACKOFF_MAX_MS);
        Mockito.when(this.subscriptions.subscription()).thenReturn(Collections.singleton("topic"));
        this.membershipManager.onHeartbeatSuccess(new ConsumerGroupHeartbeatResponse(new ConsumerGroupHeartbeatResponseData().setMemberId(DEFAULT_MEMBER_ID).setMemberEpoch(1)).data());
        mockStableMemberData(DEFAULT_GROUP_INSTANCE_ID);
        NetworkClientDelegate.PollResult poll = this.heartbeatRequestManager.poll(this.time.milliseconds());
        Assertions.assertEquals(1, poll.unsentRequests.size());
        NetworkClientDelegate.UnsentRequest unsentRequest = (NetworkClientDelegate.UnsentRequest) poll.unsentRequests.get(0);
        Assertions.assertInstanceOf(ConsumerGroupHeartbeatRequest.Builder.class, unsentRequest.requestBuilder());
        ConsumerGroupHeartbeatRequest build = unsentRequest.requestBuilder().build(s);
        Assertions.assertEquals(DEFAULT_GROUP_ID, build.data().groupId());
        Assertions.assertEquals(DEFAULT_MEMBER_ID, build.data().memberId());
        Assertions.assertEquals(1, build.data().memberEpoch());
        Assertions.assertEquals(DEFAULT_MAX_POLL_INTERVAL_MS, build.data().rebalanceTimeoutMs());
        Assertions.assertEquals("topic", build.data().subscribedTopicNames().get(0));
        Assertions.assertEquals(DEFAULT_GROUP_INSTANCE_ID, build.data().instanceId());
        Assertions.assertEquals(DEFAULT_REMOTE_ASSIGNOR, build.data().serverAssignor());
    }

    @ApiKeyVersionsSource(apiKey = ApiKeys.CONSUMER_GROUP_HEARTBEAT)
    @ParameterizedTest
    public void testValidateConsumerGroupHeartbeatRequestAssignmentSentWhenLocalEpochChanges(short s) {
        createHeartbeatStatAndRequestManager();
        Mockito.when(Boolean.valueOf(this.membershipManager.shouldHeartbeatNow())).thenReturn(true);
        Uuid randomUuid = Uuid.randomUuid();
        ConsumerGroupHeartbeatRequestData.TopicPartitions topicPartitions = new ConsumerGroupHeartbeatRequestData.TopicPartitions();
        Map singletonMap = Collections.singletonMap(randomUuid, Utils.mkSortedSet(new Integer[]{0}));
        topicPartitions.setTopicId(randomUuid);
        topicPartitions.setPartitions(Collections.singletonList(0));
        Mockito.when(this.membershipManager.currentAssignment()).thenReturn(new MembershipManager.LocalAssignment(0L, singletonMap));
        Assertions.assertEquals(Collections.singletonList(topicPartitions), getHeartbeatRequest(this.heartbeatRequestManager, s).data().topicPartitions());
        Mockito.when(Boolean.valueOf(this.heartbeatRequestState.canSendRequest(ArgumentMatchers.anyLong()))).thenReturn(true);
        Assertions.assertNull(getHeartbeatRequest(this.heartbeatRequestManager, s).data().topicPartitions());
        Mockito.when(this.membershipManager.currentAssignment()).thenReturn(new MembershipManager.LocalAssignment(1L, singletonMap));
        Assertions.assertEquals(Collections.singletonList(topicPartitions), getHeartbeatRequest(this.heartbeatRequestManager, s).data().topicPartitions());
    }

    private ConsumerGroupHeartbeatRequest getHeartbeatRequest(HeartbeatRequestManager heartbeatRequestManager, short s) {
        NetworkClientDelegate.PollResult poll = heartbeatRequestManager.poll(this.time.milliseconds());
        Assertions.assertEquals(1, poll.unsentRequests.size());
        NetworkClientDelegate.UnsentRequest unsentRequest = (NetworkClientDelegate.UnsentRequest) poll.unsentRequests.get(0);
        Assertions.assertInstanceOf(ConsumerGroupHeartbeatRequest.Builder.class, unsentRequest.requestBuilder());
        return unsentRequest.requestBuilder().build(s);
    }

    @MethodSource({"errorProvider"})
    @ParameterizedTest
    public void testHeartbeatResponseOnErrorHandling(Errors errors, boolean z) {
        this.time.sleep(DEFAULT_RETRY_BACKOFF_MAX_MS);
        NetworkClientDelegate.PollResult poll = this.heartbeatRequestManager.poll(this.time.milliseconds());
        Assertions.assertEquals(1, poll.unsentRequests.size());
        Mockito.when(Boolean.valueOf(this.subscriptions.hasAutoAssignedPartitions())).thenReturn(true);
        ClientResponse createHeartbeatResponse = createHeartbeatResponse((NetworkClientDelegate.UnsentRequest) poll.unsentRequests.get(0), errors);
        ((NetworkClientDelegate.UnsentRequest) poll.unsentRequests.get(0)).handler().onComplete(createHeartbeatResponse);
        ConsumerGroupHeartbeatResponse responseBody = createHeartbeatResponse.responseBody();
        switch (AnonymousClass1.$SwitchMap$org$apache$kafka$common$protocol$Errors[errors.ordinal()]) {
            case 1:
                ((MembershipManager) Mockito.verify(this.membershipManager)).onHeartbeatSuccess(responseBody.data());
                assertNextHeartbeatTiming(DEFAULT_RETRY_BACKOFF_MAX_MS);
                break;
            case 2:
                ((BackgroundEventHandler) Mockito.verify(this.backgroundEventHandler, Mockito.never())).add((BackgroundEvent) ArgumentMatchers.any());
                assertNextHeartbeatTiming(DEFAULT_RETRY_BACKOFF_MS);
                break;
            case NewTopicTest.NUM_PARTITIONS /* 3 */:
            case 4:
                ((BackgroundEventHandler) Mockito.verify(this.backgroundEventHandler, Mockito.never())).add((BackgroundEvent) ArgumentMatchers.any());
                ((CoordinatorRequestManager) Mockito.verify(this.coordinatorRequestManager)).markCoordinatorUnknown((String) ArgumentMatchers.any(), ArgumentMatchers.anyLong());
                assertNextHeartbeatTiming(0L);
                break;
            case 5:
            case 6:
                ((BackgroundEventHandler) Mockito.verify(this.backgroundEventHandler, Mockito.never())).add((BackgroundEvent) ArgumentMatchers.any());
                assertNextHeartbeatTiming(0L);
                break;
            default:
                if (!z) {
                    ((BackgroundEventHandler) Mockito.verify(this.backgroundEventHandler, Mockito.never())).add((BackgroundEvent) ArgumentMatchers.any());
                    assertNextHeartbeatTiming(0L);
                    break;
                } else {
                    Mockito.when(this.coordinatorRequestManager.coordinator()).thenReturn(Optional.empty());
                    ensureFatalError(errors);
                    break;
                }
        }
        if (errors != Errors.NONE) {
            ((MembershipManager) Mockito.verify(this.membershipManager)).onHeartbeatFailure(false);
        }
        if (z) {
            return;
        }
        this.time.sleep(DEFAULT_RETRY_BACKOFF_MAX_MS);
        Assertions.assertEquals(1, this.heartbeatRequestManager.poll(this.time.milliseconds()).unsentRequests.size());
    }

    private void assertNextHeartbeatTiming(long j) {
        long milliseconds = this.time.milliseconds();
        Assertions.assertEquals(j, this.heartbeatRequestState.timeToNextHeartbeatMs(milliseconds));
        if (j != 0) {
            Assertions.assertFalse(this.heartbeatRequestState.canSendRequest(milliseconds));
            this.time.sleep(j);
        }
        Assertions.assertTrue(this.heartbeatRequestState.canSendRequest(this.time.milliseconds()));
    }

    @Test
    public void testHeartbeatState() {
        mockJoiningMemberData(null);
        this.heartbeatState = new HeartbeatRequestManager.HeartbeatState(this.subscriptions, this.membershipManager, DEFAULT_MAX_POLL_INTERVAL_MS);
        createHeartbeatRequestStateWithZeroHeartbeatInterval();
        ConsumerGroupHeartbeatRequestData buildRequestData = this.heartbeatState.buildRequestData();
        Assertions.assertEquals(DEFAULT_GROUP_ID, buildRequestData.groupId());
        Assertions.assertEquals("", buildRequestData.memberId());
        Assertions.assertEquals(0, buildRequestData.memberEpoch());
        Assertions.assertNull(buildRequestData.instanceId());
        Assertions.assertEquals(DEFAULT_MAX_POLL_INTERVAL_MS, buildRequestData.rebalanceTimeoutMs());
        Assertions.assertEquals(Collections.emptyList(), buildRequestData.subscribedTopicNames());
        Assertions.assertEquals(DEFAULT_REMOTE_ASSIGNOR, buildRequestData.serverAssignor());
        Assertions.assertEquals(Collections.emptyList(), buildRequestData.topicPartitions());
        Mockito.when(this.membershipManager.state()).thenReturn(MemberState.STABLE);
        Mockito.when(Boolean.valueOf(this.subscriptions.hasAutoAssignedPartitions())).thenReturn(true);
        Mockito.when(this.subscriptions.rebalanceListener()).thenReturn(Optional.empty());
        mockStableMemberData(null);
        ConsumerGroupHeartbeatRequestData buildRequestData2 = this.heartbeatState.buildRequestData();
        Assertions.assertEquals(DEFAULT_GROUP_ID, buildRequestData2.groupId());
        Assertions.assertEquals(DEFAULT_MEMBER_ID, buildRequestData2.memberId());
        Assertions.assertEquals(1, buildRequestData2.memberEpoch());
        Assertions.assertNull(buildRequestData2.instanceId());
        Assertions.assertEquals(-1, buildRequestData2.rebalanceTimeoutMs());
        Assertions.assertNull(buildRequestData2.subscribedTopicNames());
        Assertions.assertNull(buildRequestData2.serverAssignor());
        Assertions.assertEquals(Collections.emptyList(), buildRequestData2.topicPartitions());
        this.subscriptions.subscribe(Collections.singleton("topic1"), Optional.empty());
        Mockito.when(this.subscriptions.subscription()).thenReturn(Collections.singleton("topic1"));
        mockRejoiningMemberData();
        ConsumerGroupHeartbeatRequestData buildRequestData3 = this.heartbeatState.buildRequestData();
        Assertions.assertEquals(DEFAULT_GROUP_ID, buildRequestData3.groupId());
        Assertions.assertEquals(DEFAULT_MEMBER_ID, buildRequestData3.memberId());
        Assertions.assertEquals(0, buildRequestData3.memberEpoch());
        Assertions.assertNull(buildRequestData3.instanceId());
        Assertions.assertEquals(DEFAULT_MAX_POLL_INTERVAL_MS, buildRequestData3.rebalanceTimeoutMs());
        Assertions.assertEquals(Collections.singletonList("topic1"), buildRequestData3.subscribedTopicNames());
        Assertions.assertEquals(DEFAULT_REMOTE_ASSIGNOR, buildRequestData3.serverAssignor());
        Assertions.assertEquals(Collections.emptyList(), buildRequestData3.topicPartitions());
        ConsumerGroupHeartbeatRequestData buildRequestData4 = this.heartbeatState.buildRequestData();
        Assertions.assertEquals(DEFAULT_GROUP_ID, buildRequestData4.groupId());
        Assertions.assertEquals(DEFAULT_MEMBER_ID, buildRequestData4.memberId());
        Assertions.assertEquals(0, buildRequestData4.memberEpoch());
        Assertions.assertNull(buildRequestData4.instanceId());
        Assertions.assertEquals(DEFAULT_MAX_POLL_INTERVAL_MS, buildRequestData4.rebalanceTimeoutMs());
        Assertions.assertEquals(Collections.singletonList("topic1"), buildRequestData4.subscribedTopicNames());
        Assertions.assertEquals(DEFAULT_REMOTE_ASSIGNOR, buildRequestData4.serverAssignor());
        Assertions.assertEquals(Collections.emptyList(), buildRequestData4.topicPartitions());
        ConsumerGroupHeartbeatResponseData.TopicPartitions topicPartitions = new ConsumerGroupHeartbeatResponseData.TopicPartitions();
        Uuid randomUuid = Uuid.randomUuid();
        topicPartitions.setTopicId(randomUuid);
        topicPartitions.setPartitions(Collections.singletonList(0));
        new ConsumerGroupHeartbeatResponseData.Assignment().setTopicPartitions(Collections.singletonList(topicPartitions));
        Mockito.when(this.metadata.topicNames()).thenReturn(Collections.singletonMap(randomUuid, "topic1"));
    }

    @Test
    public void testPollTimerExpiration() {
        this.heartbeatRequestManager = createHeartbeatRequestManager(this.coordinatorRequestManager, this.membershipManager, this.heartbeatState, this.heartbeatRequestState, this.backgroundEventHandler);
        Mockito.when(Boolean.valueOf(this.membershipManager.shouldSkipHeartbeat())).thenReturn(false);
        this.time.sleep(10000L);
        assertHeartbeat(this.heartbeatRequestManager, DEFAULT_HEARTBEAT_INTERVAL_MS);
        ((MembershipManager) Mockito.verify(this.membershipManager)).transitionToSendingLeaveGroup(true);
        ((HeartbeatRequestManager.HeartbeatState) Mockito.verify(this.heartbeatState)).reset();
        ((HeartbeatRequestManager.HeartbeatRequestState) Mockito.verify(this.heartbeatRequestState)).reset();
        ((MembershipManager) Mockito.verify(this.membershipManager)).onHeartbeatRequestGenerated();
        Mockito.when(Boolean.valueOf(this.membershipManager.shouldSkipHeartbeat())).thenReturn(true);
        assertNoHeartbeat(this.heartbeatRequestManager);
        this.heartbeatRequestManager.resetPollTimer(this.time.milliseconds());
        Assertions.assertTrue(this.pollTimer.notExpired());
        ((MembershipManager) Mockito.verify(this.membershipManager)).maybeRejoinStaleMember();
        Mockito.when(Boolean.valueOf(this.membershipManager.shouldSkipHeartbeat())).thenReturn(false);
        assertHeartbeat(this.heartbeatRequestManager, DEFAULT_HEARTBEAT_INTERVAL_MS);
    }

    @Test
    public void testPollTimerExpirationShouldNotMarkMemberStaleIfMemberAlreadyLeaving() {
        Mockito.when(Boolean.valueOf(this.membershipManager.shouldSkipHeartbeat())).thenReturn(false);
        Mockito.when(Boolean.valueOf(this.membershipManager.isLeavingGroup())).thenReturn(true);
        this.time.sleep(10000L);
        NetworkClientDelegate.PollResult poll = this.heartbeatRequestManager.poll(this.time.milliseconds());
        ((MembershipManager) Mockito.verify(this.membershipManager, Mockito.never())).transitionToSendingLeaveGroup(ArgumentMatchers.anyBoolean());
        Assertions.assertEquals(1, poll.unsentRequests.size(), "A heartbeat request should be generated to complete the ongoing leaving operation that was triggered before the poll timer expired.");
    }

    @Test
    public void testisExpiredByUsedForLogging() {
        Mockito.when(Boolean.valueOf(this.membershipManager.shouldSkipHeartbeat())).thenReturn(false);
        this.time.sleep(DEFAULT_MAX_POLL_INTERVAL_MS + 5);
        Mockito.when(Boolean.valueOf(this.membershipManager.isLeavingGroup())).thenReturn(false);
        Assertions.assertEquals(1, this.heartbeatRequestManager.poll(this.time.milliseconds()).unsentRequests.size());
        ((MembershipManager) Mockito.verify(this.membershipManager)).transitionToSendingLeaveGroup(true);
        ((Timer) Mockito.verify(this.pollTimer, Mockito.never())).isExpiredBy();
        Mockito.clearInvocations(new Timer[]{this.pollTimer});
        this.heartbeatRequestManager.resetPollTimer(this.time.milliseconds());
        ((Timer) Mockito.verify(this.pollTimer)).isExpiredBy();
    }

    @Test
    public void testFencedMemberStopHeartbeatUntilItReleasesAssignmentToRejoin() {
        this.heartbeatRequestManager = createHeartbeatRequestManager(this.coordinatorRequestManager, this.membershipManager, this.heartbeatState, this.heartbeatRequestState, this.backgroundEventHandler);
        this.time.sleep(DEFAULT_RETRY_BACKOFF_MAX_MS);
        NetworkClientDelegate.PollResult poll = this.heartbeatRequestManager.poll(this.time.milliseconds());
        Assertions.assertEquals(1, poll.unsentRequests.size());
        Mockito.when(Boolean.valueOf(this.subscriptions.hasAutoAssignedPartitions())).thenReturn(true);
        ((NetworkClientDelegate.UnsentRequest) poll.unsentRequests.get(0)).handler().onComplete(createHeartbeatResponse((NetworkClientDelegate.UnsentRequest) poll.unsentRequests.get(0), Errors.FENCED_MEMBER_EPOCH));
        ((MembershipManager) Mockito.verify(this.membershipManager)).transitionToFenced();
        ((HeartbeatRequestManager.HeartbeatRequestState) Mockito.verify(this.heartbeatRequestState)).onFailedAttempt(ArgumentMatchers.anyLong());
        ((HeartbeatRequestManager.HeartbeatRequestState) Mockito.verify(this.heartbeatRequestState)).reset();
        Mockito.when(Boolean.valueOf(this.membershipManager.shouldSkipHeartbeat())).thenReturn(true);
        Assertions.assertEquals(0, this.heartbeatRequestManager.poll(this.time.milliseconds()).unsentRequests.size(), "Member should not send heartbeats while FENCED");
        Mockito.when(Boolean.valueOf(this.membershipManager.shouldSkipHeartbeat())).thenReturn(false);
        Assertions.assertEquals(1, this.heartbeatRequestManager.poll(this.time.milliseconds()).unsentRequests.size(), "Fenced member should resume heartbeat after transitioning to JOINING");
    }

    @ApiKeyVersionsSource(apiKey = ApiKeys.CONSUMER_GROUP_HEARTBEAT)
    @ParameterizedTest
    public void testSendingLeaveGroupHeartbeatWhenPreviousOneInFlight(short s) {
        this.time.sleep(DEFAULT_RETRY_BACKOFF_MAX_MS);
        Assertions.assertEquals(1, this.heartbeatRequestManager.poll(this.time.milliseconds()).unsentRequests.size());
        Assertions.assertEquals(0, this.heartbeatRequestManager.poll(this.time.milliseconds()).unsentRequests.size(), "No heartbeat should be sent while a previous one is in-flight");
        Mockito.when(this.membershipManager.state()).thenReturn(MemberState.LEAVING);
        Mockito.when(this.heartbeatState.buildRequestData()).thenReturn(new ConsumerGroupHeartbeatRequestData().setMemberEpoch(-1));
        Assertions.assertEquals(-1, getHeartbeatRequest(this.heartbeatRequestManager, s).data().memberEpoch());
        Mockito.when(Boolean.valueOf(this.membershipManager.shouldSkipHeartbeat())).thenReturn(true);
        Assertions.assertEquals(0, this.heartbeatRequestManager.poll(this.time.milliseconds()).unsentRequests.size());
    }

    private void assertHeartbeat(HeartbeatRequestManager heartbeatRequestManager, int i) {
        NetworkClientDelegate.PollResult poll = heartbeatRequestManager.poll(this.time.milliseconds());
        Assertions.assertEquals(1, poll.unsentRequests.size());
        Assertions.assertEquals(i, poll.timeUntilNextPollMs);
        ((NetworkClientDelegate.UnsentRequest) poll.unsentRequests.get(0)).handler().onComplete(createHeartbeatResponse((NetworkClientDelegate.UnsentRequest) poll.unsentRequests.get(0), Errors.NONE));
    }

    private void assertNoHeartbeat(HeartbeatRequestManager heartbeatRequestManager) {
        Assertions.assertEquals(0, heartbeatRequestManager.poll(this.time.milliseconds()).unsentRequests.size());
    }

    private void ensureFatalError(Errors errors) {
        ((MembershipManager) Mockito.verify(this.membershipManager)).transitionToFatal();
        ArgumentCaptor forClass = ArgumentCaptor.forClass(ErrorEvent.class);
        ((BackgroundEventHandler) Mockito.verify(this.backgroundEventHandler)).add((BackgroundEvent) forClass.capture());
        Assertions.assertInstanceOf(errors.exception().getClass(), ((ErrorEvent) forClass.getValue()).error(), "The fatal error propagated to the app thread does not match the error received in the heartbeat response.");
        ensureHeartbeatStopped();
    }

    private void ensureHeartbeatStopped() {
        this.time.sleep(DEFAULT_RETRY_BACKOFF_MAX_MS);
        Assertions.assertEquals(0, this.heartbeatRequestManager.poll(this.time.milliseconds()).unsentRequests.size());
    }

    private static Collection<Arguments> errorProvider() {
        return Arrays.asList(Arguments.of(new Object[]{Errors.NONE, false}), Arguments.of(new Object[]{Errors.COORDINATOR_NOT_AVAILABLE, false}), Arguments.of(new Object[]{Errors.COORDINATOR_LOAD_IN_PROGRESS, false}), Arguments.of(new Object[]{Errors.NOT_COORDINATOR, false}), Arguments.of(new Object[]{Errors.GROUP_AUTHORIZATION_FAILED, true}), Arguments.of(new Object[]{Errors.INVALID_REQUEST, true}), Arguments.of(new Object[]{Errors.UNKNOWN_MEMBER_ID, false}), Arguments.of(new Object[]{Errors.FENCED_MEMBER_EPOCH, false}), Arguments.of(new Object[]{Errors.UNSUPPORTED_ASSIGNOR, true}), Arguments.of(new Object[]{Errors.UNSUPPORTED_VERSION, true}), Arguments.of(new Object[]{Errors.UNRELEASED_INSTANCE_ID, true}), Arguments.of(new Object[]{Errors.FENCED_INSTANCE_ID, true}), Arguments.of(new Object[]{Errors.GROUP_MAX_SIZE_REACHED, true}));
    }

    private ClientResponse createHeartbeatResponse(NetworkClientDelegate.UnsentRequest unsentRequest, Errors errors) {
        ConsumerGroupHeartbeatResponseData memberEpoch = new ConsumerGroupHeartbeatResponseData().setErrorCode(errors.code()).setHeartbeatIntervalMs(DEFAULT_HEARTBEAT_INTERVAL_MS).setMemberId(DEFAULT_MEMBER_ID).setMemberEpoch(1);
        if (errors != Errors.NONE) {
            memberEpoch.setErrorMessage("stubbed error message");
        }
        return new ClientResponse(new RequestHeader(ApiKeys.CONSUMER_GROUP_HEARTBEAT, ApiKeys.CONSUMER_GROUP_HEARTBEAT.latestVersion(), "client-id", 1), unsentRequest.handler(), "0", this.time.milliseconds(), this.time.milliseconds(), false, (UnsupportedVersionException) null, (AuthenticationException) null, new ConsumerGroupHeartbeatResponse(memberEpoch));
    }

    private ConsumerConfig config() {
        Properties properties = new Properties();
        properties.put("key.deserializer", StringDeserializer.class);
        properties.put("value.deserializer", StringDeserializer.class);
        properties.setProperty("bootstrap.servers", "localhost:9999");
        properties.setProperty("max.poll.interval.ms", String.valueOf(DEFAULT_MAX_POLL_INTERVAL_MS));
        properties.setProperty("retry.backoff.ms", String.valueOf(DEFAULT_RETRY_BACKOFF_MS));
        properties.setProperty("retry.backoff.max.ms", String.valueOf(DEFAULT_RETRY_BACKOFF_MAX_MS));
        properties.setProperty("heartbeat.interval.ms", String.valueOf(DEFAULT_HEARTBEAT_INTERVAL_MS));
        return new ConsumerConfig(properties);
    }

    private HeartbeatRequestManager createHeartbeatRequestManager(CoordinatorRequestManager coordinatorRequestManager, MembershipManager membershipManager, HeartbeatRequestManager.HeartbeatState heartbeatState, HeartbeatRequestManager.HeartbeatRequestState heartbeatRequestState, BackgroundEventHandler backgroundEventHandler) {
        LogContext logContext = new LogContext();
        this.pollTimer = this.time.timer(10000L);
        return new HeartbeatRequestManager(logContext, this.pollTimer, config(), coordinatorRequestManager, membershipManager, heartbeatState, heartbeatRequestState, backgroundEventHandler, new Metrics());
    }

    private void mockJoiningMemberData(String str) {
        Mockito.when(this.membershipManager.state()).thenReturn(MemberState.JOINING);
        Mockito.when(this.membershipManager.groupInstanceId()).thenReturn(Optional.ofNullable(str));
        Mockito.when(this.membershipManager.memberId()).thenReturn("");
        Mockito.when(Integer.valueOf(this.membershipManager.memberEpoch())).thenReturn(0);
        Mockito.when(this.membershipManager.groupId()).thenReturn(DEFAULT_GROUP_ID);
        Mockito.when(this.membershipManager.currentAssignment()).thenReturn(MembershipManager.LocalAssignment.NONE);
        Mockito.when(this.membershipManager.serverAssignor()).thenReturn(Optional.of(DEFAULT_REMOTE_ASSIGNOR));
    }

    private void mockRejoiningMemberData() {
        Mockito.when(this.membershipManager.state()).thenReturn(MemberState.JOINING);
        Mockito.when(Integer.valueOf(this.membershipManager.memberEpoch())).thenReturn(0);
        Mockito.when(this.membershipManager.groupInstanceId()).thenReturn(Optional.empty());
    }

    private void mockStableMemberData(String str) {
        Mockito.when(this.membershipManager.groupInstanceId()).thenReturn(Optional.ofNullable(str));
        Mockito.when(this.membershipManager.currentAssignment()).thenReturn(new MembershipManager.LocalAssignment(0L, Collections.emptyMap()));
        Mockito.when(this.membershipManager.groupId()).thenReturn(DEFAULT_GROUP_ID);
        Mockito.when(this.membershipManager.memberId()).thenReturn(DEFAULT_MEMBER_ID);
        Mockito.when(Integer.valueOf(this.membershipManager.memberEpoch())).thenReturn(1);
        Mockito.when(this.membershipManager.serverAssignor()).thenReturn(Optional.of(DEFAULT_REMOTE_ASSIGNOR));
    }
}
