package org.apache.kafka.clients.consumer.internals;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.SortedSet;
import java.util.TreeSet;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CompletableFuture;
import java.util.stream.Collectors;
import org.apache.kafka.clients.consumer.internals.Utils;
import org.apache.kafka.clients.consumer.internals.events.BackgroundEvent;
import org.apache.kafka.clients.consumer.internals.events.BackgroundEventHandler;
import org.apache.kafka.clients.consumer.internals.events.ConsumerRebalanceListenerCallbackCompletedEvent;
import org.apache.kafka.clients.consumer.internals.events.ConsumerRebalanceListenerCallbackNeededEvent;
import org.apache.kafka.clients.consumer.internals.metrics.RebalanceCallbackMetricsManager;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.TopicIdPartition;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.message.ConsumerGroupHeartbeatResponseData;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.requests.ConsumerGroupHeartbeatResponse;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.mockito.ArgumentMatchers;
import org.mockito.Mockito;

/* loaded from: input_file:org/apache/kafka/clients/consumer/internals/MembershipManagerImplTest.class */
public class MembershipManagerImplTest {
    private static final String GROUP_ID = "test-group";
    private static final String MEMBER_ID = "test-member-1";
    private static final int REBALANCE_TIMEOUT = 100;
    private static final int MEMBER_EPOCH = 1;
    private final LogContext logContext = new LogContext();
    private SubscriptionState subscriptionState;
    private ConsumerMetadata metadata;
    private CommitRequestManager commitRequestManager;
    private ConsumerTestBuilder testBuilder;
    private BlockingQueue<BackgroundEvent> backgroundEventQueue;
    private BackgroundEventHandler backgroundEventHandler;
    private Time time;

    @BeforeEach
    public void setup() {
        this.testBuilder = new ConsumerTestBuilder(ConsumerTestBuilder.createDefaultGroupInformation());
        this.metadata = this.testBuilder.metadata;
        this.subscriptionState = this.testBuilder.subscriptions;
        this.commitRequestManager = this.testBuilder.commitRequestManager.orElseThrow(IllegalStateException::new);
        this.backgroundEventQueue = this.testBuilder.backgroundEventQueue;
        this.backgroundEventHandler = this.testBuilder.backgroundEventHandler;
        this.time = this.testBuilder.time;
    }

    @AfterEach
    public void tearDown() {
        if (this.testBuilder != null) {
            this.testBuilder.close();
        }
    }

    private MembershipManagerImpl createMembershipManagerJoiningGroup() {
        MembershipManagerImpl membershipManagerImpl = (MembershipManagerImpl) Mockito.spy(new MembershipManagerImpl(GROUP_ID, Optional.empty(), REBALANCE_TIMEOUT, Optional.empty(), this.subscriptionState, this.commitRequestManager, this.metadata, this.logContext, this.backgroundEventHandler, this.time, Optional.empty()));
        membershipManagerImpl.transitionToJoining();
        return membershipManagerImpl;
    }

    private MembershipManagerImpl createMembershipManagerJoiningGroup(String str) {
        MembershipManagerImpl membershipManagerImpl = (MembershipManagerImpl) Mockito.spy(new MembershipManagerImpl(GROUP_ID, Optional.ofNullable(str), REBALANCE_TIMEOUT, Optional.empty(), this.subscriptionState, this.commitRequestManager, this.metadata, this.logContext, this.backgroundEventHandler, this.time, Optional.empty()));
        membershipManagerImpl.transitionToJoining();
        return membershipManagerImpl;
    }

    private MembershipManagerImpl createMembershipManagerJoiningGroup(String str, String str2) {
        MembershipManagerImpl membershipManagerImpl = new MembershipManagerImpl(GROUP_ID, Optional.ofNullable(str), REBALANCE_TIMEOUT, Optional.ofNullable(str2), this.subscriptionState, this.commitRequestManager, this.metadata, this.logContext, this.backgroundEventHandler, this.time, Optional.empty());
        membershipManagerImpl.transitionToJoining();
        return membershipManagerImpl;
    }

    @Test
    public void testMembershipManagerServerAssignor() {
        Assertions.assertEquals(Optional.empty(), createMembershipManagerJoiningGroup().serverAssignor());
        Assertions.assertEquals(Optional.of("Uniform"), createMembershipManagerJoiningGroup("instance1", "Uniform").serverAssignor());
    }

    @Test
    public void testMembershipManagerInitSupportsEmptyGroupInstanceId() {
        createMembershipManagerJoiningGroup();
        createMembershipManagerJoiningGroup(null, null);
    }

    @Test
    public void testMembershipManagerRegistersForClusterMetadataUpdatesOnFirstJoin() {
        MembershipManagerImpl membershipManagerImpl = new MembershipManagerImpl(GROUP_ID, Optional.empty(), REBALANCE_TIMEOUT, Optional.empty(), this.subscriptionState, this.commitRequestManager, this.metadata, this.logContext, this.backgroundEventHandler, this.time, Optional.empty());
        membershipManagerImpl.transitionToJoining();
        Mockito.clearInvocations(new ConsumerMetadata[]{this.metadata});
        receiveEmptyAssignment(membershipManagerImpl);
        mockLeaveGroup();
        membershipManagerImpl.leaveGroup();
        Assertions.assertEquals(MemberState.LEAVING, membershipManagerImpl.state());
        membershipManagerImpl.onHeartbeatRequestSent();
        Assertions.assertEquals(MemberState.UNSUBSCRIBED, membershipManagerImpl.state());
        membershipManagerImpl.transitionToJoining();
    }

    @Test
    public void testReconcilingWhenReceivingAssignmentFoundInMetadata() {
        MembershipManagerImpl mockJoinAndReceiveAssignment = mockJoinAndReceiveAssignment(true);
        Assertions.assertEquals(MemberState.ACKNOWLEDGING, mockJoinAndReceiveAssignment.state());
        mockJoinAndReceiveAssignment.onHeartbeatRequestSent();
        Assertions.assertEquals(MemberState.STABLE, mockJoinAndReceiveAssignment.state());
    }

    @Test
    public void testTransitionToReconcilingOnlyIfAssignmentReceived() {
        MembershipManagerImpl createMembershipManagerJoiningGroup = createMembershipManagerJoiningGroup();
        Assertions.assertEquals(MemberState.JOINING, createMembershipManagerJoiningGroup.state());
        createMembershipManagerJoiningGroup.onHeartbeatResponseReceived(createConsumerGroupHeartbeatResponse(null).data());
        Assertions.assertNotEquals(MemberState.RECONCILING, createMembershipManagerJoiningGroup.state());
        createMembershipManagerJoiningGroup.onHeartbeatResponseReceived(createConsumerGroupHeartbeatResponse(createAssignment(true)).data());
        Assertions.assertEquals(MemberState.RECONCILING, createMembershipManagerJoiningGroup.state());
    }

    @Test
    public void testMemberIdAndEpochResetOnFencedMembers() {
        MembershipManagerImpl createMembershipManagerJoiningGroup = createMembershipManagerJoiningGroup();
        createMembershipManagerJoiningGroup.onHeartbeatResponseReceived(createConsumerGroupHeartbeatResponse(null).data());
        Assertions.assertEquals(MemberState.STABLE, createMembershipManagerJoiningGroup.state());
        Assertions.assertEquals(MEMBER_ID, createMembershipManagerJoiningGroup.memberId());
        Assertions.assertEquals(1, createMembershipManagerJoiningGroup.memberEpoch());
        mockMemberHasAutoAssignedPartition();
        createMembershipManagerJoiningGroup.transitionToFenced();
        Assertions.assertEquals(MEMBER_ID, createMembershipManagerJoiningGroup.memberId());
        Assertions.assertEquals(0, createMembershipManagerJoiningGroup.memberEpoch());
    }

    @Test
    public void testTransitionToFatal() {
        MembershipManagerImpl createMembershipManagerJoiningGroup = createMembershipManagerJoiningGroup();
        createMembershipManagerJoiningGroup.onHeartbeatResponseReceived(createConsumerGroupHeartbeatResponse(null).data());
        Assertions.assertEquals(MemberState.STABLE, createMembershipManagerJoiningGroup.state());
        Assertions.assertEquals(MEMBER_ID, createMembershipManagerJoiningGroup.memberId());
        Assertions.assertEquals(1, createMembershipManagerJoiningGroup.memberEpoch());
        Mockito.when(Boolean.valueOf(this.subscriptionState.hasAutoAssignedPartitions())).thenReturn(true);
        createMembershipManagerJoiningGroup.transitionToFatal();
        Assertions.assertEquals(MemberState.FATAL, createMembershipManagerJoiningGroup.state());
        ((SubscriptionState) Mockito.verify(this.subscriptionState)).assignFromSubscribed(Collections.emptySet());
    }

    @Test
    public void testTransitionToFailedWhenTryingToJoin() {
        MembershipManagerImpl membershipManagerImpl = new MembershipManagerImpl(GROUP_ID, Optional.empty(), REBALANCE_TIMEOUT, Optional.empty(), this.subscriptionState, this.commitRequestManager, this.metadata, this.logContext, this.backgroundEventHandler, this.time, Optional.empty());
        Assertions.assertEquals(MemberState.UNSUBSCRIBED, membershipManagerImpl.state());
        membershipManagerImpl.transitionToJoining();
        Mockito.when(Boolean.valueOf(this.subscriptionState.hasAutoAssignedPartitions())).thenReturn(true);
        membershipManagerImpl.transitionToFatal();
        Assertions.assertEquals(MemberState.FATAL, membershipManagerImpl.state());
    }

    @Test
    public void testFencingWhenStateIsStable() {
        testFencedMemberReleasesAssignmentAndTransitionsToJoining(createMemberInStableState());
        ((SubscriptionState) Mockito.verify(this.subscriptionState)).assignFromSubscribed(Collections.emptySet());
    }

    @Test
    public void testListenersGetNotifiedOnTransitionsToFatal() {
        MembershipManagerImpl createMembershipManagerJoiningGroup = createMembershipManagerJoiningGroup();
        MemberStateListener memberStateListener = (MemberStateListener) Mockito.mock(MemberStateListener.class);
        createMembershipManagerJoiningGroup.registerStateListener(memberStateListener);
        mockStableMember(createMembershipManagerJoiningGroup);
        ((MemberStateListener) Mockito.verify(memberStateListener)).onMemberEpochUpdated(Optional.of(1), Optional.of(MEMBER_ID));
        Mockito.clearInvocations(new MemberStateListener[]{memberStateListener});
        createMembershipManagerJoiningGroup.transitionToFatal();
        Assertions.assertEquals(MemberState.FATAL, createMembershipManagerJoiningGroup.state());
        ((MemberStateListener) Mockito.verify(memberStateListener)).onMemberEpochUpdated(Optional.empty(), Optional.empty());
    }

    @Test
    public void testListenersGetNotifiedOnTransitionsToLeavingGroup() {
        MembershipManagerImpl createMembershipManagerJoiningGroup = createMembershipManagerJoiningGroup();
        MemberStateListener memberStateListener = (MemberStateListener) Mockito.mock(MemberStateListener.class);
        createMembershipManagerJoiningGroup.registerStateListener(memberStateListener);
        mockStableMember(createMembershipManagerJoiningGroup);
        ((MemberStateListener) Mockito.verify(memberStateListener)).onMemberEpochUpdated(Optional.of(1), Optional.of(MEMBER_ID));
        Mockito.clearInvocations(new MemberStateListener[]{memberStateListener});
        mockLeaveGroup();
        createMembershipManagerJoiningGroup.leaveGroup();
        Assertions.assertEquals(MemberState.LEAVING, createMembershipManagerJoiningGroup.state());
        ((MemberStateListener) Mockito.verify(memberStateListener)).onMemberEpochUpdated(Optional.empty(), Optional.empty());
    }

    @Test
    public void testListenersGetNotifiedOfMemberEpochUpdatesOnlyIfItChanges() {
        MembershipManagerImpl createMembershipManagerJoiningGroup = createMembershipManagerJoiningGroup();
        MemberStateListener memberStateListener = (MemberStateListener) Mockito.mock(MemberStateListener.class);
        createMembershipManagerJoiningGroup.registerStateListener(memberStateListener);
        createMembershipManagerJoiningGroup.onHeartbeatResponseReceived(new ConsumerGroupHeartbeatResponseData().setErrorCode(Errors.NONE.code()).setMemberId(MEMBER_ID).setMemberEpoch(5));
        ((MemberStateListener) Mockito.verify(memberStateListener)).onMemberEpochUpdated(Optional.of(5), Optional.of(MEMBER_ID));
        Mockito.clearInvocations(new MemberStateListener[]{memberStateListener});
        createMembershipManagerJoiningGroup.onHeartbeatResponseReceived(new ConsumerGroupHeartbeatResponseData().setErrorCode(Errors.NONE.code()).setMemberId(MEMBER_ID).setMemberEpoch(5));
        ((MemberStateListener) Mockito.verify(memberStateListener, Mockito.never())).onMemberEpochUpdated((Optional) ArgumentMatchers.any(), (Optional) ArgumentMatchers.any());
    }

    private void mockStableMember(MembershipManagerImpl membershipManagerImpl) {
        membershipManagerImpl.onHeartbeatResponseReceived(createConsumerGroupHeartbeatResponse(null).data());
        Assertions.assertEquals(MemberState.STABLE, membershipManagerImpl.state());
        Assertions.assertEquals(MEMBER_ID, membershipManagerImpl.memberId());
        Assertions.assertEquals(1, membershipManagerImpl.memberEpoch());
    }

    @Test
    public void testFencingWhenStateIsReconciling() {
        MembershipManagerImpl mockJoinAndReceiveAssignment = mockJoinAndReceiveAssignment(false);
        Assertions.assertEquals(MemberState.RECONCILING, mockJoinAndReceiveAssignment.state());
        testFencedMemberReleasesAssignmentAndTransitionsToJoining(mockJoinAndReceiveAssignment);
        ((SubscriptionState) Mockito.verify(this.subscriptionState)).assignFromSubscribed(Collections.emptySet());
    }

    @Test
    public void testFencingWhenStateIsPrepareLeaving() {
        MembershipManagerImpl createMemberInStableState = createMemberInStableState();
        ConsumerRebalanceListenerCallbackCompletedEvent mockPrepareLeavingStuckOnUserCallback = mockPrepareLeavingStuckOnUserCallback(createMemberInStableState, consumerRebalanceListenerInvoker());
        Assertions.assertEquals(MemberState.PREPARE_LEAVING, createMemberInStableState.state());
        Mockito.clearInvocations(new SubscriptionState[]{this.subscriptionState});
        createMemberInStableState.transitionToFenced();
        testFenceIsNoOp(createMemberInStableState);
        Assertions.assertEquals(MemberState.UNSUBSCRIBED, createMemberInStableState.state());
        completeCallback(mockPrepareLeavingStuckOnUserCallback, createMemberInStableState);
        Assertions.assertEquals(MemberState.UNSUBSCRIBED, createMemberInStableState.state());
        Assertions.assertTrue(createMemberInStableState.shouldSkipHeartbeat());
    }

    @Test
    public void testNewAssignmentIgnoredWhenStateIsPrepareLeaving() {
        MembershipManagerImpl createMemberInStableState = createMemberInStableState();
        ConsumerRebalanceListenerCallbackCompletedEvent mockPrepareLeavingStuckOnUserCallback = mockPrepareLeavingStuckOnUserCallback(createMemberInStableState, consumerRebalanceListenerInvoker());
        Assertions.assertEquals(MemberState.PREPARE_LEAVING, createMemberInStableState.state());
        Uuid randomUuid = Uuid.randomUuid();
        mockOwnedPartitionAndAssignmentReceived(createMemberInStableState, randomUuid, "topic1", Collections.emptyList());
        receiveAssignment(randomUuid, Arrays.asList(0, 1), createMemberInStableState);
        Assertions.assertEquals(MemberState.PREPARE_LEAVING, createMemberInStableState.state());
        Assertions.assertTrue(createMemberInStableState.topicsAwaitingReconciliation().isEmpty());
        ((MembershipManagerImpl) Mockito.verify(createMemberInStableState, Mockito.never())).markReconciliationInProgress();
        completeCallback(mockPrepareLeavingStuckOnUserCallback, createMemberInStableState);
        Assertions.assertEquals(MemberState.LEAVING, createMemberInStableState.state());
    }

    @Test
    public void testFencingWhenStateIsLeaving() {
        MembershipManagerImpl createMemberInStableState = createMemberInStableState();
        mockLeaveGroup();
        createMemberInStableState.leaveGroup();
        Assertions.assertEquals(MemberState.LEAVING, createMemberInStableState.state());
        Mockito.clearInvocations(new SubscriptionState[]{this.subscriptionState});
        createMemberInStableState.transitionToFenced();
        testFenceIsNoOp(createMemberInStableState);
        Assertions.assertEquals(MemberState.LEAVING, createMemberInStableState.state());
        createMemberInStableState.onHeartbeatRequestSent();
        Assertions.assertEquals(MemberState.UNSUBSCRIBED, createMemberInStableState.state());
        Mockito.clearInvocations(new SubscriptionState[]{this.subscriptionState});
        createMemberInStableState.transitionToFenced();
        testFenceIsNoOp(createMemberInStableState);
        Assertions.assertEquals(MemberState.UNSUBSCRIBED, createMemberInStableState.state());
    }

    @Test
    public void testLeaveGroupEpoch() {
        MembershipManagerImpl createMemberInStableState = createMemberInStableState("instance1");
        mockLeaveGroup();
        createMemberInStableState.leaveGroup();
        Assertions.assertEquals(MemberState.LEAVING, createMemberInStableState.state());
        Assertions.assertEquals(-2, createMemberInStableState.memberEpoch());
        MembershipManagerImpl createMemberInStableState2 = createMemberInStableState(null);
        mockLeaveGroup();
        createMemberInStableState2.leaveGroup();
        Assertions.assertEquals(MemberState.LEAVING, createMemberInStableState2.state());
        Assertions.assertEquals(-1, createMemberInStableState2.memberEpoch());
    }

    @Test
    public void testDelayedReconciliationResultDiscardedIfMemberNotInReconcilingStateAnymore() {
        MembershipManagerImpl createMemberInStableState = createMemberInStableState();
        Uuid randomUuid = Uuid.randomUuid();
        mockOwnedPartitionAndAssignmentReceived(createMemberInStableState, randomUuid, "topic1", Collections.singletonList(new TopicIdPartition(randomUuid, new TopicPartition("topic1", 0))));
        CompletableFuture<Void> mockEmptyAssignmentAndRevocationStuckOnCommit = mockEmptyAssignmentAndRevocationStuckOnCommit(createMemberInStableState);
        Mockito.when(Boolean.valueOf(this.subscriptionState.hasAutoAssignedPartitions())).thenReturn(true);
        createMemberInStableState.transitionToFatal();
        ((SubscriptionState) Mockito.verify(this.subscriptionState)).assignFromSubscribed(Collections.emptySet());
        Mockito.clearInvocations(new SubscriptionState[]{this.subscriptionState});
        mockEmptyAssignmentAndRevocationStuckOnCommit.complete(null);
        ((SubscriptionState) Mockito.verify(this.subscriptionState, Mockito.never())).assignFromSubscribed(ArgumentMatchers.anySet());
        Assertions.assertNotEquals(MemberState.ACKNOWLEDGING, createMemberInStableState.state());
    }

    @Test
    public void testDelayedReconciliationResultDiscardedIfMemberRejoins() {
        MembershipManagerImpl createMemberInStableState = createMemberInStableState();
        Uuid randomUuid = Uuid.randomUuid();
        List singletonList = Collections.singletonList(new TopicIdPartition(randomUuid, new TopicPartition("topic1", 0)));
        mockOwnedPartitionAndAssignmentReceived(createMemberInStableState, randomUuid, "topic1", singletonList);
        CompletableFuture<Void> mockNewAssignmentAndRevocationStuckOnCommit = mockNewAssignmentAndRevocationStuckOnCommit(createMemberInStableState, randomUuid, "topic1", Arrays.asList(1, 2), true);
        Assertions.assertEquals(topicIdPartitionsMap(randomUuid, 1, 2), createMemberInStableState.topicPartitionsAwaitingReconciliation());
        testFencedMemberReleasesAssignmentAndTransitionsToJoining(createMemberInStableState);
        Mockito.clearInvocations(new SubscriptionState[]{this.subscriptionState});
        Uuid randomUuid2 = Uuid.randomUuid();
        mockOwnedPartitionAndAssignmentReceived(createMemberInStableState, randomUuid2, "topic3", singletonList);
        receiveAssignmentAfterRejoin(randomUuid2, Collections.singletonList(5), createMemberInStableState);
        verifyReconciliationNotTriggered(createMemberInStableState);
        Map<Uuid, SortedSet<Integer>> map = topicIdPartitionsMap(randomUuid2, 5);
        Assertions.assertEquals(map, createMemberInStableState.topicPartitionsAwaitingReconciliation());
        mockNewAssignmentAndRevocationStuckOnCommit.complete(null);
        ((SubscriptionState) Mockito.verify(this.subscriptionState, Mockito.never())).assignFromSubscribed(ArgumentMatchers.anyCollection());
        Assertions.assertNotEquals(MemberState.ACKNOWLEDGING, createMemberInStableState.state());
        Assertions.assertEquals(map, createMemberInStableState.topicPartitionsAwaitingReconciliation());
    }

    @Test
    public void testDelayedReconciliationResultAppliedWhenTargetChangedWithMetadataUpdate() {
        MembershipManagerImpl mockMemberSuccessfullyReceivesAndAcksAssignment = mockMemberSuccessfullyReceivesAndAcksAssignment(Uuid.randomUuid(), "topic1", Collections.singletonList(0));
        mockMemberSuccessfullyReceivesAndAcksAssignment.onHeartbeatRequestSent();
        Assertions.assertEquals(MemberState.STABLE, mockMemberSuccessfullyReceivesAndAcksAssignment.state());
        Mockito.clearInvocations(new Object[]{mockMemberSuccessfullyReceivesAndAcksAssignment, this.subscriptionState});
        Mockito.when(this.subscriptionState.assignedPartitions()).thenReturn(Collections.singleton(new TopicPartition("topic1", 0)));
        Uuid randomUuid = Uuid.randomUuid();
        CompletableFuture<Void> mockNewAssignmentAndRevocationStuckOnCommit = mockNewAssignmentAndRevocationStuckOnCommit(mockMemberSuccessfullyReceivesAndAcksAssignment, randomUuid, "topic2", Arrays.asList(1, 2), false);
        ((ConsumerMetadata) Mockito.verify(this.metadata)).requestUpdate(ArgumentMatchers.anyBoolean());
        Assertions.assertEquals(Collections.singleton(randomUuid), mockMemberSuccessfullyReceivesAndAcksAssignment.topicsAwaitingReconciliation());
        Map<Uuid, String> singletonMap = Collections.singletonMap(randomUuid, "topic2");
        mockTopicNameInMetadataCache(singletonMap, true);
        Assertions.assertEquals(Collections.singleton(randomUuid), mockMemberSuccessfullyReceivesAndAcksAssignment.topicsAwaitingReconciliation());
        verifyReconciliationNotTriggered(mockMemberSuccessfullyReceivesAndAcksAssignment);
        mockNewAssignmentAndRevocationStuckOnCommit.complete(null);
        ((SubscriptionState) Mockito.verify(this.subscriptionState)).assignFromSubscribed(Collections.emptySet());
        Assertions.assertEquals(MemberState.ACKNOWLEDGING, mockMemberSuccessfullyReceivesAndAcksAssignment.state());
        Map<Uuid, SortedSet<Integer>> map = topicIdPartitionsMap(randomUuid, 1, 2);
        Assertions.assertEquals(map, mockMemberSuccessfullyReceivesAndAcksAssignment.topicPartitionsAwaitingReconciliation());
        mockMemberSuccessfullyReceivesAndAcksAssignment.onHeartbeatRequestSent();
        Assertions.assertEquals(MemberState.RECONCILING, mockMemberSuccessfullyReceivesAndAcksAssignment.state());
        Assertions.assertEquals(map, mockMemberSuccessfullyReceivesAndAcksAssignment.topicPartitionsAwaitingReconciliation());
        mockMemberSuccessfullyReceivesAndAcksAssignment.poll(this.time.milliseconds());
        Assertions.assertEquals(Collections.emptySet(), mockMemberSuccessfullyReceivesAndAcksAssignment.topicsAwaitingReconciliation());
        ((SubscriptionState) Mockito.verify(this.subscriptionState)).assignFromSubscribed(topicPartitions(map, singletonMap));
    }

    @Test
    public void testDelayedReconciliationResultAppliedWhenTargetChangedWithNewAssignment() {
        Uuid randomUuid = Uuid.randomUuid();
        TopicIdPartition topicIdPartition = new TopicIdPartition(randomUuid, new TopicPartition("topic1", 0));
        Uuid randomUuid2 = Uuid.randomUuid();
        TopicIdPartition topicIdPartition2 = new TopicIdPartition(randomUuid2, new TopicPartition("topic2", 0));
        RequestManager createMembershipManagerJoiningGroup = createMembershipManagerJoiningGroup();
        Mockito.when(Boolean.valueOf(this.subscriptionState.hasAutoAssignedPartitions())).thenReturn(true);
        Mockito.when(this.metadata.topicNames()).thenReturn(Utils.mkMap(new Map.Entry[]{Utils.mkEntry(randomUuid, "topic1"), Utils.mkEntry(randomUuid2, "topic2")}));
        CompletableFuture<Void> mockNewAssignmentAndRevocationStuckOnCommit = mockNewAssignmentAndRevocationStuckOnCommit(createMembershipManagerJoiningGroup, randomUuid, "topic1", Collections.singletonList(0), false);
        receiveAssignment(Utils.mkMap(new Map.Entry[]{Utils.mkEntry(randomUuid, Utils.mkSortedSet(new Integer[]{0})), Utils.mkEntry(randomUuid2, Utils.mkSortedSet(new Integer[]{0}))}), createMembershipManagerJoiningGroup);
        createMembershipManagerJoiningGroup.poll(this.time.milliseconds());
        verifyReconciliationNotTriggered(createMembershipManagerJoiningGroup);
        Assertions.assertEquals(MemberState.RECONCILING, createMembershipManagerJoiningGroup.state());
        Assertions.assertEquals(Utils.mkSet(new Uuid[]{randomUuid, randomUuid2}), createMembershipManagerJoiningGroup.topicsAwaitingReconciliation());
        Mockito.clearInvocations(new RequestManager[]{createMembershipManagerJoiningGroup, this.commitRequestManager});
        mockNewAssignmentAndRevocationStuckOnCommit.complete(null);
        Assertions.assertEquals(MemberState.ACKNOWLEDGING, createMembershipManagerJoiningGroup.state());
        Assertions.assertEquals(Utils.mkSet(new Uuid[]{randomUuid2}), createMembershipManagerJoiningGroup.topicsAwaitingReconciliation());
        createMembershipManagerJoiningGroup.onHeartbeatRequestSent();
        Assertions.assertEquals(MemberState.RECONCILING, createMembershipManagerJoiningGroup.state());
        Mockito.clearInvocations(new RequestManager[]{createMembershipManagerJoiningGroup, this.commitRequestManager});
        createMembershipManagerJoiningGroup.poll(this.time.milliseconds());
        verifyReconciliationTriggeredAndCompleted(createMembershipManagerJoiningGroup, Arrays.asList(topicIdPartition, topicIdPartition2));
    }

    @Test
    public void testDelayedMetadataUsedToCompleteAssignment() {
        Uuid randomUuid = Uuid.randomUuid();
        TopicIdPartition topicIdPartition = new TopicIdPartition(randomUuid, new TopicPartition("topic1", 0));
        Uuid randomUuid2 = Uuid.randomUuid();
        TopicIdPartition topicIdPartition2 = new TopicIdPartition(randomUuid2, new TopicPartition("topic2", 0));
        RequestManager mockMemberSuccessfullyReceivesAndAcksAssignment = mockMemberSuccessfullyReceivesAndAcksAssignment(randomUuid, "topic1", Collections.singletonList(0));
        mockMemberSuccessfullyReceivesAndAcksAssignment.onHeartbeatRequestSent();
        Assertions.assertEquals(MemberState.STABLE, mockMemberSuccessfullyReceivesAndAcksAssignment.state());
        Mockito.when(this.subscriptionState.assignedPartitions()).thenReturn(getTopicPartitions(Collections.singleton(topicIdPartition)));
        Mockito.clearInvocations(new Object[]{mockMemberSuccessfullyReceivesAndAcksAssignment, this.subscriptionState});
        receiveAssignment(Utils.mkMap(new Map.Entry[]{Utils.mkEntry(randomUuid, Utils.mkSortedSet(new Integer[]{0})), Utils.mkEntry(randomUuid2, Utils.mkSortedSet(new Integer[]{0}))}), mockMemberSuccessfullyReceivesAndAcksAssignment);
        mockMemberSuccessfullyReceivesAndAcksAssignment.poll(this.time.milliseconds());
        verifyReconciliationNotTriggered(mockMemberSuccessfullyReceivesAndAcksAssignment);
        Assertions.assertEquals(MemberState.RECONCILING, mockMemberSuccessfullyReceivesAndAcksAssignment.state());
        Assertions.assertEquals(Collections.singleton(randomUuid2), mockMemberSuccessfullyReceivesAndAcksAssignment.topicsAwaitingReconciliation());
        ((ConsumerMetadata) Mockito.verify(this.metadata)).requestUpdate(ArgumentMatchers.anyBoolean());
        Mockito.clearInvocations(new RequestManager[]{mockMemberSuccessfullyReceivesAndAcksAssignment, this.commitRequestManager});
        Mockito.when(this.metadata.topicNames()).thenReturn(Utils.mkMap(new Map.Entry[]{Utils.mkEntry(randomUuid, "topic1"), Utils.mkEntry(randomUuid2, "topic2")}));
        mockMemberSuccessfullyReceivesAndAcksAssignment.poll(this.time.milliseconds());
        verifyReconciliationTriggeredAndCompleted(mockMemberSuccessfullyReceivesAndAcksAssignment, Arrays.asList(topicIdPartition, topicIdPartition2));
    }

    @Test
    public void testLeaveGroupWhenStateIsStable() {
        testLeaveGroupReleasesAssignmentAndResetsEpochToSendLeaveGroup(createMemberInStableState());
        ((SubscriptionState) Mockito.verify(this.subscriptionState)).assignFromSubscribed(Collections.emptySet());
    }

    @Test
    public void testIgnoreHeartbeatWhenLeavingGroup() {
        MembershipManagerImpl createMemberInStableState = createMemberInStableState();
        mockLeaveGroup();
        CompletableFuture leaveGroup = createMemberInStableState.leaveGroup();
        createMemberInStableState.onHeartbeatResponseReceived(createConsumerGroupHeartbeatResponse(createAssignment(true)).data());
        Assertions.assertEquals(MemberState.LEAVING, createMemberInStableState.state());
        Assertions.assertEquals(-1, createMemberInStableState.memberEpoch());
        Assertions.assertEquals(MEMBER_ID, createMemberInStableState.memberId());
        Assertions.assertTrue(createMemberInStableState.currentAssignment().isEmpty());
        Assertions.assertFalse(leaveGroup.isDone(), "Leave group result should not complete until the heartbeat request to leave is sent out.");
    }

    @Test
    public void testLeaveGroupWhenStateIsReconciling() {
        MembershipManagerImpl mockJoinAndReceiveAssignment = mockJoinAndReceiveAssignment(false);
        Assertions.assertEquals(MemberState.RECONCILING, mockJoinAndReceiveAssignment.state());
        testLeaveGroupReleasesAssignmentAndResetsEpochToSendLeaveGroup(mockJoinAndReceiveAssignment);
    }

    @Test
    public void testLeaveGroupWhenMemberOwnsAssignment() {
        Uuid randomUuid = Uuid.randomUuid();
        MembershipManagerImpl createMembershipManagerJoiningGroup = createMembershipManagerJoiningGroup();
        mockOwnedPartitionAndAssignmentReceived(createMembershipManagerJoiningGroup, randomUuid, "topic1", Collections.emptyList());
        receiveAssignment(randomUuid, Arrays.asList(0, 1), createMembershipManagerJoiningGroup);
        verifyReconciliationNotTriggered(createMembershipManagerJoiningGroup);
        createMembershipManagerJoiningGroup.poll(this.time.milliseconds());
        verifyReconciliationTriggeredAndCompleted(createMembershipManagerJoiningGroup, Arrays.asList(new TopicIdPartition(randomUuid, new TopicPartition("topic1", 0)), new TopicIdPartition(randomUuid, new TopicPartition("topic1", 1))));
        Assertions.assertEquals(1, createMembershipManagerJoiningGroup.currentAssignment().size());
        testLeaveGroupReleasesAssignmentAndResetsEpochToSendLeaveGroup(createMembershipManagerJoiningGroup);
    }

    @Test
    public void testLeaveGroupWhenMemberAlreadyLeaving() {
        MembershipManagerImpl createMemberInStableState = createMemberInStableState();
        mockLeaveGroup();
        CompletableFuture leaveGroup = createMemberInStableState.leaveGroup();
        Assertions.assertFalse(leaveGroup.isDone());
        Assertions.assertEquals(MemberState.LEAVING, createMemberInStableState.state());
        ((SubscriptionState) Mockito.verify(this.subscriptionState)).assignFromSubscribed(Collections.emptySet());
        Mockito.clearInvocations(new SubscriptionState[]{this.subscriptionState});
        mockLeaveGroup();
        CompletableFuture leaveGroup2 = createMemberInStableState.leaveGroup();
        ((SubscriptionState) Mockito.verify(this.subscriptionState, Mockito.never())).rebalanceListener();
        Assertions.assertFalse(leaveGroup2.isDone());
        createMemberInStableState.onHeartbeatRequestSent();
        Assertions.assertTrue(leaveGroup.isDone());
        Assertions.assertFalse(leaveGroup.isCompletedExceptionally());
        Assertions.assertTrue(leaveGroup2.isDone());
        Assertions.assertFalse(leaveGroup2.isCompletedExceptionally());
        ((SubscriptionState) Mockito.verify(this.subscriptionState, Mockito.never())).assignFromSubscribed(Collections.emptySet());
    }

    @Test
    public void testLeaveGroupWhenMemberAlreadyLeft() {
        MembershipManagerImpl createMemberInStableState = createMemberInStableState();
        mockLeaveGroup();
        CompletableFuture leaveGroup = createMemberInStableState.leaveGroup();
        Assertions.assertEquals(MemberState.LEAVING, createMemberInStableState.state());
        createMemberInStableState.onHeartbeatRequestSent();
        Assertions.assertEquals(MemberState.UNSUBSCRIBED, createMemberInStableState.state());
        Assertions.assertTrue(leaveGroup.isDone());
        Assertions.assertFalse(leaveGroup.isCompletedExceptionally());
        ((SubscriptionState) Mockito.verify(this.subscriptionState)).assignFromSubscribed(Collections.emptySet());
        Mockito.clearInvocations(new SubscriptionState[]{this.subscriptionState});
        mockLeaveGroup();
        CompletableFuture leaveGroup2 = createMemberInStableState.leaveGroup();
        Assertions.assertTrue(leaveGroup2.isDone());
        Assertions.assertFalse(leaveGroup2.isCompletedExceptionally());
        Assertions.assertEquals(MemberState.UNSUBSCRIBED, createMemberInStableState.state());
        ((SubscriptionState) Mockito.verify(this.subscriptionState, Mockito.never())).rebalanceListener();
        ((SubscriptionState) Mockito.verify(this.subscriptionState, Mockito.never())).assignFromSubscribed(Collections.emptySet());
    }

    @Test
    public void testFatalFailureWhenStateIsUnjoined() {
        MembershipManagerImpl createMembershipManagerJoiningGroup = createMembershipManagerJoiningGroup();
        Assertions.assertEquals(MemberState.JOINING, createMembershipManagerJoiningGroup.state());
        testStateUpdateOnFatalFailure(createMembershipManagerJoiningGroup);
    }

    @Test
    public void testFatalFailureWhenStateIsStable() {
        MembershipManagerImpl createMembershipManagerJoiningGroup = createMembershipManagerJoiningGroup();
        createMembershipManagerJoiningGroup.onHeartbeatResponseReceived(createConsumerGroupHeartbeatResponse(null).data());
        Assertions.assertEquals(MemberState.STABLE, createMembershipManagerJoiningGroup.state());
        testStateUpdateOnFatalFailure(createMembershipManagerJoiningGroup);
    }

    @Test
    public void testFatalFailureWhenStateIsPrepareLeaving() {
        MembershipManagerImpl createMemberInStableState = createMemberInStableState();
        ConsumerRebalanceListenerCallbackCompletedEvent mockPrepareLeavingStuckOnUserCallback = mockPrepareLeavingStuckOnUserCallback(createMemberInStableState, consumerRebalanceListenerInvoker());
        Assertions.assertEquals(MemberState.PREPARE_LEAVING, createMemberInStableState.state());
        testStateUpdateOnFatalFailure(createMemberInStableState);
        completeCallback(mockPrepareLeavingStuckOnUserCallback, createMemberInStableState);
        Assertions.assertEquals(MemberState.FATAL, createMemberInStableState.state());
    }

    @Test
    public void testFatalFailureWhenStateIsLeaving() {
        MembershipManagerImpl createMemberInStableState = createMemberInStableState();
        mockLeaveGroup();
        createMemberInStableState.leaveGroup();
        Assertions.assertEquals(MemberState.LEAVING, createMemberInStableState.state());
        testStateUpdateOnFatalFailure(createMemberInStableState);
        Assertions.assertEquals(MemberState.FATAL, createMemberInStableState.state());
        createMemberInStableState.onHeartbeatRequestSent();
        Assertions.assertEquals(MemberState.FATAL, createMemberInStableState.state());
    }

    @Test
    public void testFatalFailureWhenMemberAlreadyLeft() {
        MembershipManagerImpl createMemberInStableState = createMemberInStableState();
        mockLeaveGroup();
        createMemberInStableState.leaveGroup();
        Assertions.assertEquals(MemberState.LEAVING, createMemberInStableState.state());
        createMemberInStableState.onHeartbeatRequestSent();
        Assertions.assertEquals(MemberState.UNSUBSCRIBED, createMemberInStableState.state());
        MockRebalanceListener mockRebalanceListener = new MockRebalanceListener();
        Mockito.when(this.subscriptionState.rebalanceListener()).thenReturn(Optional.of(mockRebalanceListener));
        createMemberInStableState.transitionToFatal();
        Assertions.assertEquals(0, mockRebalanceListener.lostCount);
        Assertions.assertEquals(MemberState.FATAL, createMemberInStableState.state());
    }

    @Test
    public void testUpdateStateFailsOnResponsesWithErrors() {
        MembershipManagerImpl createMembershipManagerJoiningGroup = createMembershipManagerJoiningGroup();
        ConsumerGroupHeartbeatResponse createConsumerGroupHeartbeatResponseWithError = createConsumerGroupHeartbeatResponseWithError();
        Assertions.assertThrows(IllegalArgumentException.class, () -> {
            createMembershipManagerJoiningGroup.onHeartbeatResponseReceived(createConsumerGroupHeartbeatResponseWithError.data());
        });
    }

    @Test
    public void testNewAssignmentReplacesPreviousOneWaitingOnMetadata() {
        MembershipManagerImpl mockJoinAndReceiveAssignment = mockJoinAndReceiveAssignment(false);
        Assertions.assertEquals(MemberState.RECONCILING, mockJoinAndReceiveAssignment.state());
        Assertions.assertFalse(mockJoinAndReceiveAssignment.topicsAwaitingReconciliation().isEmpty());
        mockJoinAndReceiveAssignment.onHeartbeatRequestSent();
        Assertions.assertEquals(MemberState.RECONCILING, mockJoinAndReceiveAssignment.state());
        Assertions.assertFalse(mockJoinAndReceiveAssignment.topicsAwaitingReconciliation().isEmpty());
        Uuid randomUuid = Uuid.randomUuid();
        Mockito.when(this.metadata.topicNames()).thenReturn(Collections.singletonMap(randomUuid, "topic1"));
        receiveAssignment(randomUuid, Collections.singletonList(0), mockJoinAndReceiveAssignment);
        verifyReconciliationNotTriggered(mockJoinAndReceiveAssignment);
        mockJoinAndReceiveAssignment.poll(this.time.milliseconds());
        Set singleton = Collections.singleton(new TopicPartition("topic1", 0));
        Assertions.assertEquals(MemberState.ACKNOWLEDGING, mockJoinAndReceiveAssignment.state());
        ((SubscriptionState) Mockito.verify(this.subscriptionState)).assignFromSubscribed(singleton);
        mockJoinAndReceiveAssignment.onHeartbeatRequestSent();
        Assertions.assertEquals(MemberState.STABLE, mockJoinAndReceiveAssignment.state());
        Assertions.assertTrue(mockJoinAndReceiveAssignment.topicsAwaitingReconciliation().isEmpty());
    }

    @Test
    public void testNewEmptyAssignmentReplacesPreviousOneWaitingOnMetadata() {
        MembershipManagerImpl mockJoinAndReceiveAssignment = mockJoinAndReceiveAssignment(false);
        Assertions.assertEquals(MemberState.RECONCILING, mockJoinAndReceiveAssignment.state());
        Assertions.assertFalse(mockJoinAndReceiveAssignment.topicsAwaitingReconciliation().isEmpty());
        mockJoinAndReceiveAssignment.onHeartbeatRequestSent();
        Assertions.assertEquals(MemberState.RECONCILING, mockJoinAndReceiveAssignment.state());
        Assertions.assertFalse(mockJoinAndReceiveAssignment.topicsAwaitingReconciliation().isEmpty());
        Mockito.when(this.metadata.topicNames()).thenReturn(Collections.singletonMap(Uuid.randomUuid(), "topic1"));
        receiveEmptyAssignment(mockJoinAndReceiveAssignment);
        verifyReconciliationNotTriggered(mockJoinAndReceiveAssignment);
        mockJoinAndReceiveAssignment.poll(this.time.milliseconds());
        Assertions.assertEquals(MemberState.STABLE, mockJoinAndReceiveAssignment.state());
        ((SubscriptionState) Mockito.verify(this.subscriptionState, Mockito.never())).assignFromSubscribed((Collection) ArgumentMatchers.any());
    }

    @Test
    public void testNewAssignmentNotInMetadataReplacesPreviousOneWaitingOnMetadata() {
        MembershipManagerImpl mockJoinAndReceiveAssignment = mockJoinAndReceiveAssignment(false);
        Assertions.assertEquals(MemberState.RECONCILING, mockJoinAndReceiveAssignment.state());
        Assertions.assertFalse(mockJoinAndReceiveAssignment.topicsAwaitingReconciliation().isEmpty());
        Uuid randomUuid = Uuid.randomUuid();
        Mockito.when(this.metadata.topicNames()).thenReturn(Collections.emptyMap());
        receiveAssignment(randomUuid, Collections.singletonList(0), mockJoinAndReceiveAssignment);
        verifyReconciliationNotTriggered(mockJoinAndReceiveAssignment);
        mockJoinAndReceiveAssignment.poll(this.time.milliseconds());
        Assertions.assertEquals(MemberState.RECONCILING, mockJoinAndReceiveAssignment.state());
        Assertions.assertFalse(mockJoinAndReceiveAssignment.topicsAwaitingReconciliation().isEmpty());
        Assertions.assertEquals(randomUuid, mockJoinAndReceiveAssignment.topicsAwaitingReconciliation().iterator().next());
    }

    @Test
    public void testUnresolvedTargetAssignmentIsReconciledWhenMetadataReceived() {
        MembershipManagerImpl createMemberInStableState = createMemberInStableState();
        Uuid randomUuid = Uuid.randomUuid();
        receiveAssignment(randomUuid, Collections.singletonList(1), createMemberInStableState);
        Assertions.assertEquals(MemberState.RECONCILING, createMemberInStableState.state());
        Assertions.assertFalse(createMemberInStableState.topicsAwaitingReconciliation().isEmpty());
        Mockito.when(this.metadata.topicNames()).thenReturn(Collections.singletonMap(randomUuid, "topic1"));
        Mockito.when(Boolean.valueOf(this.subscriptionState.hasAutoAssignedPartitions())).thenReturn(true);
        Mockito.when(this.subscriptionState.rebalanceListener()).thenReturn(Optional.empty());
        createMemberInStableState.poll(this.time.milliseconds());
        ((SubscriptionState) Mockito.verify(this.subscriptionState)).assignFromSubscribed(Collections.singleton(new TopicPartition("topic1", 1)));
        Assertions.assertEquals(MemberState.ACKNOWLEDGING, createMemberInStableState.state());
        Assertions.assertTrue(createMemberInStableState.topicsAwaitingReconciliation().isEmpty());
    }

    @Test
    public void testMemberKeepsUnresolvedAssignmentWaitingForMetadataUntilResolved() {
        Uuid randomUuid = Uuid.randomUuid();
        Uuid randomUuid2 = Uuid.randomUuid();
        ConsumerGroupHeartbeatResponseData.Assignment topicPartitions = new ConsumerGroupHeartbeatResponseData.Assignment().setTopicPartitions(Arrays.asList(new ConsumerGroupHeartbeatResponseData.TopicPartitions().setTopicId(randomUuid).setPartitions(Collections.singletonList(0)), new ConsumerGroupHeartbeatResponseData.TopicPartitions().setTopicId(randomUuid2).setPartitions(Arrays.asList(1, 3))));
        Mockito.when(this.metadata.topicNames()).thenReturn(Collections.singletonMap(randomUuid, "topic1"));
        MembershipManagerImpl mockJoinAndReceiveAssignment = mockJoinAndReceiveAssignment(true, topicPartitions);
        Assertions.assertEquals(MemberState.ACKNOWLEDGING, mockJoinAndReceiveAssignment.state());
        ((ConsumerMetadata) Mockito.verify(this.metadata)).requestUpdate(ArgumentMatchers.anyBoolean());
        Assertions.assertEquals(Collections.singleton(randomUuid2), mockJoinAndReceiveAssignment.topicsAwaitingReconciliation());
        mockJoinAndReceiveAssignment.onHeartbeatRequestSent();
        Assertions.assertEquals(MemberState.RECONCILING, mockJoinAndReceiveAssignment.state());
        Mockito.clearInvocations(new SubscriptionState[]{this.subscriptionState});
        mockJoinAndReceiveAssignment.onHeartbeatResponseReceived(createConsumerGroupHeartbeatResponse(topicPartitions).data());
        Assertions.assertEquals(MemberState.RECONCILING, mockJoinAndReceiveAssignment.state());
        Assertions.assertEquals(Collections.singleton(randomUuid2), mockJoinAndReceiveAssignment.topicsAwaitingReconciliation());
        ((SubscriptionState) Mockito.verify(this.subscriptionState, Mockito.never())).assignFromSubscribed(ArgumentMatchers.anyCollection());
    }

    @Test
    public void testReconcileNewPartitionsAssignedWhenNoPartitionOwned() {
        Uuid randomUuid = Uuid.randomUuid();
        MembershipManagerImpl createMembershipManagerJoiningGroup = createMembershipManagerJoiningGroup();
        mockOwnedPartitionAndAssignmentReceived(createMembershipManagerJoiningGroup, randomUuid, "topic1", Collections.emptyList());
        receiveAssignment(randomUuid, Arrays.asList(0, 1), createMembershipManagerJoiningGroup);
        verifyReconciliationNotTriggered(createMembershipManagerJoiningGroup);
        createMembershipManagerJoiningGroup.poll(this.time.milliseconds());
        verifyReconciliationTriggeredAndCompleted(createMembershipManagerJoiningGroup, topicIdPartitions(randomUuid, "topic1", 0, 1));
    }

    @Test
    public void testReconcileNewPartitionsAssignedWhenOtherPartitionsOwned() {
        Uuid randomUuid = Uuid.randomUuid();
        TopicIdPartition topicIdPartition = new TopicIdPartition(randomUuid, new TopicPartition("topic1", 0));
        MembershipManagerImpl createMemberInStableState = createMemberInStableState();
        mockOwnedPartitionAndAssignmentReceived(createMemberInStableState, randomUuid, "topic1", Collections.singletonList(topicIdPartition));
        receiveAssignment(randomUuid, Arrays.asList(0, 1, 2), createMemberInStableState);
        verifyReconciliationNotTriggered(createMemberInStableState);
        createMemberInStableState.poll(this.time.milliseconds());
        ArrayList arrayList = new ArrayList();
        arrayList.add(topicIdPartition);
        arrayList.addAll(topicIdPartitions(randomUuid, "topic1", 1, 2));
        verifyReconciliationTriggeredAndCompleted(createMemberInStableState, arrayList);
    }

    @Test
    public void testReconciliationSkippedWhenSameAssignmentReceived() {
        MembershipManagerImpl createMembershipManagerJoiningGroup = createMembershipManagerJoiningGroup();
        Uuid randomUuid = Uuid.randomUuid();
        mockOwnedPartitionAndAssignmentReceived(createMembershipManagerJoiningGroup, randomUuid, "topic1", Collections.emptyList());
        List<TopicIdPartition> list = topicIdPartitions(randomUuid, "topic1", 0, 1);
        receiveAssignment(randomUuid, Arrays.asList(0, 1), createMembershipManagerJoiningGroup);
        verifyReconciliationNotTriggered(createMembershipManagerJoiningGroup);
        createMembershipManagerJoiningGroup.poll(this.time.milliseconds());
        verifyReconciliationTriggeredAndCompleted(createMembershipManagerJoiningGroup, list);
        Assertions.assertEquals(MemberState.ACKNOWLEDGING, createMembershipManagerJoiningGroup.state());
        Mockito.clearInvocations(new Object[]{this.subscriptionState, createMembershipManagerJoiningGroup});
        createMembershipManagerJoiningGroup.onHeartbeatRequestSent();
        Assertions.assertEquals(MemberState.STABLE, createMembershipManagerJoiningGroup.state());
        mockOwnedPartitionAndAssignmentReceived(createMembershipManagerJoiningGroup, randomUuid, "topic1", list);
        receiveAssignment(randomUuid, Arrays.asList(0, 1), createMembershipManagerJoiningGroup);
        ((MembershipManagerImpl) Mockito.verify(createMembershipManagerJoiningGroup, Mockito.never())).markReconciliationInProgress();
        ((MembershipManagerImpl) Mockito.verify(createMembershipManagerJoiningGroup, Mockito.never())).markReconciliationCompleted();
        ((SubscriptionState) Mockito.verify(this.subscriptionState, Mockito.never())).assignFromSubscribed(ArgumentMatchers.anyCollection());
        Assertions.assertEquals(MemberState.STABLE, createMembershipManagerJoiningGroup.state());
    }

    @Test
    public void testReconcilePartitionsRevokedNoAutoCommitNoCallbacks() {
        MembershipManagerImpl createMemberInStableState = createMemberInStableState();
        mockOwnedPartition(createMemberInStableState, Uuid.randomUuid(), "topic1");
        mockRevocationNoCallbacks(false);
        receiveEmptyAssignment(createMemberInStableState);
        verifyReconciliationNotTriggered(createMemberInStableState);
        createMemberInStableState.poll(this.time.milliseconds());
        testRevocationOfAllPartitionsCompleted(createMemberInStableState);
    }

    @Test
    public void testReconcilePartitionsRevokedWithSuccessfulAutoCommitNoCallbacks() {
        MembershipManagerImpl createMemberInStableState = createMemberInStableState();
        mockOwnedPartition(createMemberInStableState, Uuid.randomUuid(), "topic1");
        CompletableFuture<Void> mockRevocationNoCallbacks = mockRevocationNoCallbacks(true);
        receiveEmptyAssignment(createMemberInStableState);
        verifyReconciliationNotTriggered(createMemberInStableState);
        createMemberInStableState.poll(this.time.milliseconds());
        Assertions.assertEquals(MemberState.RECONCILING, createMemberInStableState.state());
        ((SubscriptionState) Mockito.verify(this.subscriptionState, Mockito.never())).assignFromSubscribed(ArgumentMatchers.anyCollection());
        mockRevocationNoCallbacks.complete(null);
        testRevocationOfAllPartitionsCompleted(createMemberInStableState);
    }

    @Test
    public void testReconcilePartitionsRevokedWithFailedAutoCommitCompletesRevocationAnyway() {
        MembershipManagerImpl createMemberInStableState = createMemberInStableState();
        mockOwnedPartition(createMemberInStableState, Uuid.randomUuid(), "topic1");
        CompletableFuture<Void> mockRevocationNoCallbacks = mockRevocationNoCallbacks(true);
        receiveEmptyAssignment(createMemberInStableState);
        verifyReconciliationNotTriggered(createMemberInStableState);
        createMemberInStableState.poll(this.time.milliseconds());
        Assertions.assertEquals(MemberState.RECONCILING, createMemberInStableState.state());
        ((SubscriptionState) Mockito.verify(this.subscriptionState, Mockito.never())).assignFromSubscribed(ArgumentMatchers.anyCollection());
        mockRevocationNoCallbacks.completeExceptionally(new KafkaException("Commit request failed with non-retriable error"));
        testRevocationOfAllPartitionsCompleted(createMemberInStableState);
    }

    @Test
    public void testReconcileNewPartitionsAssignedAndRevoked() {
        Uuid randomUuid = Uuid.randomUuid();
        TopicIdPartition topicIdPartition = new TopicIdPartition(randomUuid, new TopicPartition("topic1", 0));
        MembershipManagerImpl createMembershipManagerJoiningGroup = createMembershipManagerJoiningGroup();
        mockOwnedPartitionAndAssignmentReceived(createMembershipManagerJoiningGroup, randomUuid, "topic1", Collections.singletonList(topicIdPartition));
        mockRevocationNoCallbacks(false);
        receiveAssignment(randomUuid, Arrays.asList(1, 2), createMembershipManagerJoiningGroup);
        verifyReconciliationNotTriggered(createMembershipManagerJoiningGroup);
        createMembershipManagerJoiningGroup.poll(this.time.milliseconds());
        Assertions.assertEquals(MemberState.ACKNOWLEDGING, createMembershipManagerJoiningGroup.state());
        Assertions.assertEquals(topicIdPartitionsMap(randomUuid, 1, 2), createMembershipManagerJoiningGroup.currentAssignment());
        Assertions.assertFalse(createMembershipManagerJoiningGroup.reconciliationInProgress());
        ((SubscriptionState) Mockito.verify(this.subscriptionState)).assignFromSubscribed(ArgumentMatchers.anyCollection());
    }

    @Test
    public void testMetadataUpdatesReconcilesUnresolvedAssignments() {
        Uuid randomUuid = Uuid.randomUuid();
        MembershipManagerImpl mockJoinAndReceiveAssignment = mockJoinAndReceiveAssignment(false, new ConsumerGroupHeartbeatResponseData.Assignment().setTopicPartitions(Collections.singletonList(new ConsumerGroupHeartbeatResponseData.TopicPartitions().setTopicId(randomUuid).setPartitions(Arrays.asList(0, 1)))));
        Assertions.assertEquals(MemberState.RECONCILING, mockJoinAndReceiveAssignment.state());
        verifyReconciliationNotTriggered(mockJoinAndReceiveAssignment);
        Assertions.assertEquals(Collections.singleton(randomUuid), mockJoinAndReceiveAssignment.topicsAwaitingReconciliation());
        ((ConsumerMetadata) Mockito.verify(this.metadata)).requestUpdate(ArgumentMatchers.anyBoolean());
        mockTopicNameInMetadataCache(Collections.singletonMap(randomUuid, "topic1"), true);
        mockJoinAndReceiveAssignment.poll(this.time.milliseconds());
        verifyReconciliationTriggeredAndCompleted(mockJoinAndReceiveAssignment, topicIdPartitions(randomUuid, "topic1", 0, 1));
        Assertions.assertEquals(MemberState.ACKNOWLEDGING, mockJoinAndReceiveAssignment.state());
        Assertions.assertTrue(mockJoinAndReceiveAssignment.topicsAwaitingReconciliation().isEmpty());
    }

    @Test
    public void testMetadataUpdatesRequestsAnotherUpdateIfNeeded() {
        Uuid randomUuid = Uuid.randomUuid();
        MembershipManagerImpl mockJoinAndReceiveAssignment = mockJoinAndReceiveAssignment(false, new ConsumerGroupHeartbeatResponseData.Assignment().setTopicPartitions(Collections.singletonList(new ConsumerGroupHeartbeatResponseData.TopicPartitions().setTopicId(randomUuid).setPartitions(Arrays.asList(0, 1)))));
        Assertions.assertEquals(MemberState.RECONCILING, mockJoinAndReceiveAssignment.state());
        verifyReconciliationNotTriggered(mockJoinAndReceiveAssignment);
        Assertions.assertEquals(Collections.singleton(randomUuid), mockJoinAndReceiveAssignment.topicsAwaitingReconciliation());
        ((ConsumerMetadata) Mockito.verify(this.metadata)).requestUpdate(ArgumentMatchers.anyBoolean());
        Mockito.when(this.metadata.topicNames()).thenReturn(Collections.emptyMap());
        mockJoinAndReceiveAssignment.poll(this.time.milliseconds());
        verifyReconciliationNotTriggered(mockJoinAndReceiveAssignment);
        Assertions.assertEquals(Collections.singleton(randomUuid), mockJoinAndReceiveAssignment.topicsAwaitingReconciliation());
        ((ConsumerMetadata) Mockito.verify(this.metadata, Mockito.times(2))).requestUpdate(ArgumentMatchers.anyBoolean());
    }

    @Test
    public void testRevokePartitionsUsesTopicNamesLocalCacheWhenMetadataNotAvailable() {
        Uuid randomUuid = Uuid.randomUuid();
        String str = "topic1";
        MembershipManagerImpl createMembershipManagerJoiningGroup = createMembershipManagerJoiningGroup();
        mockOwnedPartitionAndAssignmentReceived(createMembershipManagerJoiningGroup, randomUuid, "topic1", Collections.emptyList());
        receiveAssignment(randomUuid, Arrays.asList(0, 1), createMembershipManagerJoiningGroup);
        verifyReconciliationNotTriggered(createMembershipManagerJoiningGroup);
        createMembershipManagerJoiningGroup.poll(this.time.milliseconds());
        Assertions.assertEquals(MemberState.ACKNOWLEDGING, createMembershipManagerJoiningGroup.state());
        List asList = Arrays.asList(0, 1);
        Set set = (Set) asList.stream().map(num -> {
            return new TopicPartition(str, num.intValue());
        }).collect(Collectors.toSet());
        Assertions.assertEquals(Collections.singletonMap(randomUuid, new TreeSet(asList)), createMembershipManagerJoiningGroup.currentAssignment());
        Assertions.assertFalse(createMembershipManagerJoiningGroup.reconciliationInProgress());
        mockAckSent(createMembershipManagerJoiningGroup);
        Mockito.when(this.subscriptionState.assignedPartitions()).thenReturn(set);
        Mockito.when(Boolean.valueOf(this.subscriptionState.hasAutoAssignedPartitions())).thenReturn(true);
        mockRevocationNoCallbacks(false);
        mockTopicNameInMetadataCache(Collections.singletonMap(randomUuid, "topic1"), false);
        receiveAssignment(randomUuid, Collections.singletonList(1), createMembershipManagerJoiningGroup);
        createMembershipManagerJoiningGroup.poll(this.time.milliseconds());
        ((ConsumerMetadata) Mockito.verify(this.metadata, Mockito.never())).requestUpdate(ArgumentMatchers.anyBoolean());
        testRevocationCompleted(createMembershipManagerJoiningGroup, topicIdPartitions(randomUuid, "topic1", 1));
    }

    @Test
    public void testOnSubscriptionUpdatedTransitionsToJoiningOnlyIfNotInGroup() {
        MembershipManagerImpl createMemberInStableState = createMemberInStableState();
        ((MembershipManagerImpl) Mockito.verify(createMemberInStableState)).transitionToJoining();
        Mockito.clearInvocations(new MembershipManagerImpl[]{createMemberInStableState});
        createMemberInStableState.onSubscriptionUpdated();
        ((MembershipManagerImpl) Mockito.verify(createMemberInStableState, Mockito.never())).transitionToJoining();
    }

    @Test
    public void testListenerCallbacksBasic() {
        MembershipManagerImpl createMemberInStableState = createMemberInStableState();
        CounterConsumerRebalanceListener counterConsumerRebalanceListener = new CounterConsumerRebalanceListener();
        ConsumerRebalanceListenerInvoker consumerRebalanceListenerInvoker = consumerRebalanceListenerInvoker();
        Uuid randomUuid = Uuid.randomUuid();
        Mockito.when(this.subscriptionState.assignedPartitions()).thenReturn(Collections.emptySet());
        Mockito.when(Boolean.valueOf(this.subscriptionState.hasAutoAssignedPartitions())).thenReturn(true);
        Mockito.when(this.subscriptionState.rebalanceListener()).thenReturn(Optional.of(counterConsumerRebalanceListener));
        ((SubscriptionState) Mockito.doNothing().when(this.subscriptionState)).markPendingRevocation(ArgumentMatchers.anySet());
        Mockito.when(this.metadata.topicNames()).thenReturn(Collections.singletonMap(randomUuid, "topic1"));
        Mockito.when(this.metadata.topicNames()).thenReturn(Collections.singletonMap(randomUuid, "topic1"));
        receiveAssignment(randomUuid, Arrays.asList(0, 1), createMemberInStableState);
        verifyReconciliationNotTriggered(createMemberInStableState);
        createMemberInStableState.poll(this.time.milliseconds());
        Assertions.assertEquals(MemberState.RECONCILING, createMemberInStableState.state());
        Assertions.assertTrue(createMemberInStableState.reconciliationInProgress());
        Assertions.assertEquals(0, counterConsumerRebalanceListener.revokedCount());
        Assertions.assertEquals(0, counterConsumerRebalanceListener.assignedCount());
        Assertions.assertEquals(0, counterConsumerRebalanceListener.lostCount());
        Assertions.assertTrue(createMemberInStableState.reconciliationInProgress());
        performCallback(createMemberInStableState, consumerRebalanceListenerInvoker, ConsumerRebalanceListenerMethodName.ON_PARTITIONS_ASSIGNED, topicPartitions("topic1", 0, 1), true);
        Assertions.assertFalse(createMemberInStableState.reconciliationInProgress());
        createMemberInStableState.onHeartbeatRequestSent();
        Assertions.assertEquals(MemberState.STABLE, createMemberInStableState.state());
        Assertions.assertEquals(topicIdPartitionsMap(randomUuid, 0, 1), createMemberInStableState.currentAssignment());
        Assertions.assertEquals(0, counterConsumerRebalanceListener.revokedCount());
        Assertions.assertEquals(1, counterConsumerRebalanceListener.assignedCount());
        Assertions.assertEquals(0, counterConsumerRebalanceListener.lostCount());
        Mockito.when(this.subscriptionState.assignedPartitions()).thenReturn(topicPartitions("topic1", 0, 1));
        receiveEmptyAssignment(createMemberInStableState);
        createMemberInStableState.poll(this.time.milliseconds());
        Assertions.assertEquals(MemberState.RECONCILING, createMemberInStableState.state());
        Assertions.assertTrue(createMemberInStableState.reconciliationInProgress());
        performCallback(createMemberInStableState, consumerRebalanceListenerInvoker, ConsumerRebalanceListenerMethodName.ON_PARTITIONS_REVOKED, topicPartitions("topic1", 0, 1), true);
        Assertions.assertTrue(createMemberInStableState.reconciliationInProgress());
        performCallback(createMemberInStableState, consumerRebalanceListenerInvoker, ConsumerRebalanceListenerMethodName.ON_PARTITIONS_ASSIGNED, Collections.emptySortedSet(), true);
        Assertions.assertFalse(createMemberInStableState.reconciliationInProgress());
        createMemberInStableState.onHeartbeatRequestSent();
        Assertions.assertEquals(MemberState.STABLE, createMemberInStableState.state());
        Assertions.assertFalse(createMemberInStableState.reconciliationInProgress());
        Assertions.assertEquals(1, counterConsumerRebalanceListener.revokedCount());
        Assertions.assertEquals(2, counterConsumerRebalanceListener.assignedCount());
        Assertions.assertEquals(0, counterConsumerRebalanceListener.lostCount());
    }

    @Test
    public void testListenerCallbacksThrowsErrorOnPartitionsRevoked() {
        Uuid randomUuid = Uuid.randomUuid();
        MembershipManagerImpl createMemberInStableState = createMemberInStableState();
        mockOwnedPartition(createMemberInStableState, randomUuid, "topic1");
        CounterConsumerRebalanceListener counterConsumerRebalanceListener = new CounterConsumerRebalanceListener(Optional.of(new IllegalArgumentException("Intentional onPartitionsRevoked() error")), Optional.empty(), Optional.empty());
        ConsumerRebalanceListenerInvoker consumerRebalanceListenerInvoker = consumerRebalanceListenerInvoker();
        Mockito.when(this.subscriptionState.rebalanceListener()).thenReturn(Optional.of(counterConsumerRebalanceListener));
        ((SubscriptionState) Mockito.doNothing().when(this.subscriptionState)).markPendingRevocation(ArgumentMatchers.anySet());
        receiveEmptyAssignment(createMemberInStableState);
        verifyReconciliationNotTriggered(createMemberInStableState);
        createMemberInStableState.poll(this.time.milliseconds());
        Assertions.assertEquals(MemberState.RECONCILING, createMemberInStableState.state());
        Assertions.assertEquals(topicIdPartitionsMap(randomUuid, 0), createMemberInStableState.currentAssignment());
        Assertions.assertTrue(createMemberInStableState.reconciliationInProgress());
        Assertions.assertEquals(0, counterConsumerRebalanceListener.revokedCount());
        Assertions.assertEquals(0, counterConsumerRebalanceListener.assignedCount());
        Assertions.assertEquals(0, counterConsumerRebalanceListener.lostCount());
        Assertions.assertTrue(createMemberInStableState.reconciliationInProgress());
        performCallback(createMemberInStableState, consumerRebalanceListenerInvoker, ConsumerRebalanceListenerMethodName.ON_PARTITIONS_REVOKED, topicPartitions("topic1", 0), true);
        Assertions.assertFalse(createMemberInStableState.reconciliationInProgress());
        Assertions.assertEquals(MemberState.RECONCILING, createMemberInStableState.state());
        createMemberInStableState.onHeartbeatRequestSent();
        Assertions.assertEquals(MemberState.RECONCILING, createMemberInStableState.state());
        Assertions.assertEquals(1, counterConsumerRebalanceListener.revokedCount());
        Assertions.assertEquals(0, counterConsumerRebalanceListener.assignedCount());
        Assertions.assertEquals(0, counterConsumerRebalanceListener.lostCount());
    }

    @Test
    public void testListenerCallbacksThrowsErrorOnPartitionsAssigned() {
        MembershipManagerImpl createMemberInStableState = createMemberInStableState();
        Uuid randomUuid = Uuid.randomUuid();
        mockOwnedPartition(createMemberInStableState, randomUuid, "topic1");
        CounterConsumerRebalanceListener counterConsumerRebalanceListener = new CounterConsumerRebalanceListener(Optional.empty(), Optional.of(new IllegalArgumentException("Intentional onPartitionsAssigned() error")), Optional.empty());
        ConsumerRebalanceListenerInvoker consumerRebalanceListenerInvoker = consumerRebalanceListenerInvoker();
        Mockito.when(this.subscriptionState.rebalanceListener()).thenReturn(Optional.of(counterConsumerRebalanceListener));
        ((SubscriptionState) Mockito.doNothing().when(this.subscriptionState)).markPendingRevocation(ArgumentMatchers.anySet());
        receiveEmptyAssignment(createMemberInStableState);
        verifyReconciliationNotTriggered(createMemberInStableState);
        createMemberInStableState.poll(this.time.milliseconds());
        Assertions.assertEquals(MemberState.RECONCILING, createMemberInStableState.state());
        Assertions.assertEquals(topicIdPartitionsMap(randomUuid, 0), createMemberInStableState.currentAssignment());
        Assertions.assertTrue(createMemberInStableState.reconciliationInProgress());
        Assertions.assertEquals(0, counterConsumerRebalanceListener.revokedCount());
        Assertions.assertEquals(0, counterConsumerRebalanceListener.assignedCount());
        Assertions.assertEquals(0, counterConsumerRebalanceListener.lostCount());
        Assertions.assertTrue(createMemberInStableState.reconciliationInProgress());
        performCallback(createMemberInStableState, consumerRebalanceListenerInvoker, ConsumerRebalanceListenerMethodName.ON_PARTITIONS_REVOKED, topicPartitions("topic1", 0), true);
        Assertions.assertTrue(createMemberInStableState.reconciliationInProgress());
        performCallback(createMemberInStableState, consumerRebalanceListenerInvoker, ConsumerRebalanceListenerMethodName.ON_PARTITIONS_ASSIGNED, Collections.emptySortedSet(), true);
        Assertions.assertFalse(createMemberInStableState.reconciliationInProgress());
        Assertions.assertEquals(MemberState.RECONCILING, createMemberInStableState.state());
        createMemberInStableState.onHeartbeatRequestSent();
        Assertions.assertEquals(MemberState.RECONCILING, createMemberInStableState.state());
        Assertions.assertEquals(1, counterConsumerRebalanceListener.revokedCount());
        Assertions.assertEquals(1, counterConsumerRebalanceListener.assignedCount());
        Assertions.assertEquals(0, counterConsumerRebalanceListener.lostCount());
    }

    @Test
    public void testAddedPartitionsTemporarilyDisabledAwaitingOnPartitionsAssignedCallback() {
        MembershipManagerImpl createMembershipManagerJoiningGroup = createMembershipManagerJoiningGroup();
        ConsumerRebalanceListenerInvoker consumerRebalanceListenerInvoker = consumerRebalanceListenerInvoker();
        SortedSet<TopicPartition> sortedSet = topicPartitions("topic1", 0, 1);
        SortedSet<TopicPartition> sortedSet2 = topicPartitions("topic1", 1);
        mockPartitionOwnedAndNewPartitionAdded("topic1", 0, 1, new CounterConsumerRebalanceListener(), createMembershipManagerJoiningGroup);
        createMembershipManagerJoiningGroup.poll(this.time.milliseconds());
        ((SubscriptionState) Mockito.verify(this.subscriptionState)).assignFromSubscribedAwaitingCallback(sortedSet, sortedSet2);
        performCallback(createMembershipManagerJoiningGroup, consumerRebalanceListenerInvoker, ConsumerRebalanceListenerMethodName.ON_PARTITIONS_ASSIGNED, sortedSet2, true);
        ((SubscriptionState) Mockito.verify(this.subscriptionState)).enablePartitionsAwaitingCallback(sortedSet2);
    }

    @Test
    public void testAddedPartitionsNotEnabledAfterFailedOnPartitionsAssignedCallback() {
        MembershipManagerImpl createMembershipManagerJoiningGroup = createMembershipManagerJoiningGroup();
        ConsumerRebalanceListenerInvoker consumerRebalanceListenerInvoker = consumerRebalanceListenerInvoker();
        SortedSet<TopicPartition> sortedSet = topicPartitions("topic1", 0, 1);
        SortedSet<TopicPartition> sortedSet2 = topicPartitions("topic1", 1);
        mockPartitionOwnedAndNewPartitionAdded("topic1", 0, 1, new CounterConsumerRebalanceListener(Optional.empty(), Optional.of(new RuntimeException("onPartitionsAssigned failed!")), Optional.empty()), createMembershipManagerJoiningGroup);
        createMembershipManagerJoiningGroup.poll(this.time.milliseconds());
        ((SubscriptionState) Mockito.verify(this.subscriptionState)).assignFromSubscribedAwaitingCallback(sortedSet, sortedSet2);
        performCallback(createMembershipManagerJoiningGroup, consumerRebalanceListenerInvoker, ConsumerRebalanceListenerMethodName.ON_PARTITIONS_ASSIGNED, sortedSet2, true);
        ((SubscriptionState) Mockito.verify(this.subscriptionState, Mockito.never())).enablePartitionsAwaitingCallback((Collection) ArgumentMatchers.any());
    }

    @Test
    public void testOnPartitionsLostNoError() {
        mockOwnedPartition(createMemberInStableState(), Uuid.randomUuid(), "topic1");
        testOnPartitionsLost(Optional.empty());
    }

    @Test
    public void testOnPartitionsLostError() {
        mockOwnedPartition(createMemberInStableState(), Uuid.randomUuid(), "topic1");
        testOnPartitionsLost(Optional.of(new KafkaException("Intentional error for test")));
    }

    private void mockPartitionOwnedAndNewPartitionAdded(String str, int i, int i2, CounterConsumerRebalanceListener counterConsumerRebalanceListener, MembershipManagerImpl membershipManagerImpl) {
        Uuid randomUuid = Uuid.randomUuid();
        TopicPartition topicPartition = new TopicPartition(str, i);
        Mockito.when(this.subscriptionState.assignedPartitions()).thenReturn(Collections.singleton(topicPartition));
        membershipManagerImpl.updateCurrentAssignment(Collections.singleton(new TopicIdPartition(randomUuid, topicPartition)));
        Mockito.when(this.metadata.topicNames()).thenReturn(Collections.singletonMap(randomUuid, str));
        Mockito.when(Boolean.valueOf(this.subscriptionState.hasAutoAssignedPartitions())).thenReturn(true);
        Mockito.when(this.subscriptionState.rebalanceListener()).thenReturn(Optional.ofNullable(counterConsumerRebalanceListener));
        receiveAssignment(randomUuid, Arrays.asList(Integer.valueOf(i), Integer.valueOf(i2)), membershipManagerImpl);
    }

    private void testOnPartitionsLost(Optional<RuntimeException> optional) {
        MembershipManagerImpl createMemberInStableState = createMemberInStableState();
        CounterConsumerRebalanceListener counterConsumerRebalanceListener = new CounterConsumerRebalanceListener(Optional.empty(), Optional.empty(), optional);
        ConsumerRebalanceListenerInvoker consumerRebalanceListenerInvoker = consumerRebalanceListenerInvoker();
        Mockito.when(this.subscriptionState.rebalanceListener()).thenReturn(Optional.of(counterConsumerRebalanceListener));
        ((SubscriptionState) Mockito.doNothing().when(this.subscriptionState)).markPendingRevocation(ArgumentMatchers.anySet());
        createMemberInStableState.transitionToFenced();
        Assertions.assertEquals(MemberState.FENCED, createMemberInStableState.state());
        Assertions.assertEquals(Collections.emptyMap(), createMemberInStableState.currentAssignment());
        Assertions.assertEquals(0, counterConsumerRebalanceListener.revokedCount());
        Assertions.assertEquals(0, counterConsumerRebalanceListener.assignedCount());
        Assertions.assertEquals(0, counterConsumerRebalanceListener.lostCount());
        performCallback(createMemberInStableState, consumerRebalanceListenerInvoker, ConsumerRebalanceListenerMethodName.ON_PARTITIONS_LOST, topicPartitions("topic1", 0), true);
        createMemberInStableState.onHeartbeatRequestSent();
        Assertions.assertEquals(MemberState.JOINING, createMemberInStableState.state());
        Assertions.assertEquals(0, counterConsumerRebalanceListener.revokedCount());
        Assertions.assertEquals(0, counterConsumerRebalanceListener.assignedCount());
        Assertions.assertEquals(1, counterConsumerRebalanceListener.lostCount());
    }

    private ConsumerRebalanceListenerInvoker consumerRebalanceListenerInvoker() {
        return new ConsumerRebalanceListenerInvoker(new LogContext(), this.subscriptionState, new MockTime(1L), new RebalanceCallbackMetricsManager(new Metrics()));
    }

    private static SortedSet<TopicPartition> topicPartitions(Map<Uuid, SortedSet<Integer>> map, Map<Uuid, String> map2) {
        TreeSet treeSet = new TreeSet((Comparator) new Utils.TopicPartitionComparator());
        for (Uuid uuid : map.keySet()) {
            Iterator<Integer> it = map.get(uuid).iterator();
            while (it.hasNext()) {
                treeSet.add(new TopicPartition(map2.get(uuid), it.next().intValue()));
            }
        }
        return treeSet;
    }

    private SortedSet<TopicPartition> topicPartitions(String str, int... iArr) {
        TreeSet treeSet = new TreeSet((Comparator) new Utils.TopicPartitionComparator());
        for (int i : iArr) {
            treeSet.add(new TopicPartition(str, i));
        }
        return treeSet;
    }

    private SortedSet<TopicIdPartition> topicIdPartitionsSet(Uuid uuid, String str, int... iArr) {
        TreeSet treeSet = new TreeSet((Comparator) new Utils.TopicIdPartitionComparator());
        for (int i : iArr) {
            treeSet.add(new TopicIdPartition(uuid, new TopicPartition(str, i)));
        }
        return treeSet;
    }

    private List<TopicIdPartition> topicIdPartitions(Uuid uuid, String str, int... iArr) {
        return new ArrayList(topicIdPartitionsSet(uuid, str, iArr));
    }

    private Map<Uuid, SortedSet<Integer>> topicIdPartitionsMap(Uuid uuid, int... iArr) {
        TreeSet treeSet = new TreeSet();
        for (int i : iArr) {
            treeSet.add(Integer.valueOf(i));
        }
        return Collections.singletonMap(uuid, treeSet);
    }

    private ConsumerRebalanceListenerCallbackCompletedEvent performCallback(MembershipManagerImpl membershipManagerImpl, ConsumerRebalanceListenerInvoker consumerRebalanceListenerInvoker, ConsumerRebalanceListenerMethodName consumerRebalanceListenerMethodName, SortedSet<TopicPartition> sortedSet, boolean z) {
        Assertions.assertEquals(1, this.backgroundEventQueue.size());
        Assertions.assertNotNull(this.backgroundEventQueue.peek());
        Assertions.assertInstanceOf(ConsumerRebalanceListenerCallbackNeededEvent.class, this.backgroundEventQueue.peek());
        ConsumerRebalanceListenerCallbackNeededEvent poll = this.backgroundEventQueue.poll();
        Assertions.assertNotNull(poll);
        Assertions.assertEquals(consumerRebalanceListenerMethodName, poll.methodName());
        Assertions.assertEquals(sortedSet, poll.partitions());
        ConsumerRebalanceListenerCallbackCompletedEvent invokeRebalanceCallbacks = AsyncKafkaConsumer.invokeRebalanceCallbacks(consumerRebalanceListenerInvoker, poll.methodName(), poll.partitions(), poll.future());
        if (z) {
            completeCallback(invokeRebalanceCallbacks, membershipManagerImpl);
        }
        return invokeRebalanceCallbacks;
    }

    private void completeCallback(ConsumerRebalanceListenerCallbackCompletedEvent consumerRebalanceListenerCallbackCompletedEvent, MembershipManagerImpl membershipManagerImpl) {
        membershipManagerImpl.consumerRebalanceListenerCallbackCompleted(consumerRebalanceListenerCallbackCompletedEvent);
    }

    private void testFenceIsNoOp(MembershipManagerImpl membershipManagerImpl) {
        Assertions.assertNotEquals(0, membershipManagerImpl.memberEpoch());
        ((SubscriptionState) Mockito.verify(this.subscriptionState, Mockito.never())).rebalanceListener();
    }

    @Test
    public void testTransitionToStaled() {
        MembershipManager memberJoinWithAssignment = memberJoinWithAssignment("topic", Uuid.randomUuid());
        memberJoinWithAssignment.transitionToStale();
        Assertions.assertEquals(-1, memberJoinWithAssignment.memberEpoch());
    }

    @Test
    public void testHeartbeatSentOnStaledMember() {
        MembershipManagerImpl createMemberInStableState = createMemberInStableState();
        this.subscriptionState.subscribe(Collections.singleton("topic"), Optional.empty());
        this.subscriptionState.assignFromSubscribed(Collections.singleton(new TopicPartition("topic", 0)));
        createMemberInStableState.transitionToStale();
        createMemberInStableState.onHeartbeatRequestSent();
        Assertions.assertEquals(MemberState.JOINING, createMemberInStableState.state());
        Assertions.assertTrue(createMemberInStableState.currentAssignment().isEmpty());
        Assertions.assertTrue(this.subscriptionState.assignedPartitions().isEmpty());
    }

    @Test
    public void testMemberJoiningTransitionsToStableWhenReceivingEmptyAssignment() {
        MembershipManagerImpl createMembershipManagerJoiningGroup = createMembershipManagerJoiningGroup(null);
        Assertions.assertEquals(MemberState.JOINING, createMembershipManagerJoiningGroup.state());
        receiveEmptyAssignment(createMembershipManagerJoiningGroup);
        verifyReconciliationNotTriggered(createMembershipManagerJoiningGroup);
        createMembershipManagerJoiningGroup.poll(this.time.milliseconds());
        Assertions.assertEquals(MemberState.STABLE, createMembershipManagerJoiningGroup.state());
    }

    private MembershipManagerImpl mockMemberSuccessfullyReceivesAndAcksAssignment(Uuid uuid, String str, List<Integer> list) {
        MembershipManagerImpl createMembershipManagerJoiningGroup = createMembershipManagerJoiningGroup();
        mockOwnedPartitionAndAssignmentReceived(createMembershipManagerJoiningGroup, uuid, str, Collections.emptyList());
        receiveAssignment(uuid, list, createMembershipManagerJoiningGroup);
        verifyReconciliationNotTriggered(createMembershipManagerJoiningGroup);
        createMembershipManagerJoiningGroup.poll(this.time.milliseconds());
        verifyReconciliationTriggeredAndCompleted(createMembershipManagerJoiningGroup, (List) list.stream().map(num -> {
            return new TopicIdPartition(uuid, new TopicPartition(str, num.intValue()));
        }).collect(Collectors.toList()));
        return createMembershipManagerJoiningGroup;
    }

    private CompletableFuture<Void> mockEmptyAssignmentAndRevocationStuckOnCommit(MembershipManagerImpl membershipManagerImpl) {
        CompletableFuture<Void> mockRevocationNoCallbacks = mockRevocationNoCallbacks(true);
        receiveEmptyAssignment(membershipManagerImpl);
        verifyReconciliationNotTriggered(membershipManagerImpl);
        membershipManagerImpl.poll(this.time.milliseconds());
        verifyReconciliationTriggered(membershipManagerImpl);
        Mockito.clearInvocations(new MembershipManagerImpl[]{membershipManagerImpl});
        Assertions.assertEquals(MemberState.RECONCILING, membershipManagerImpl.state());
        return mockRevocationNoCallbacks;
    }

    private CompletableFuture<Void> mockNewAssignmentAndRevocationStuckOnCommit(MembershipManagerImpl membershipManagerImpl, Uuid uuid, String str, List<Integer> list, boolean z) {
        CompletableFuture<Void> mockRevocationNoCallbacks = mockRevocationNoCallbacks(true);
        if (z) {
            Mockito.when(this.metadata.topicNames()).thenReturn(Collections.singletonMap(uuid, str));
        }
        receiveAssignment(uuid, list, membershipManagerImpl);
        verifyReconciliationNotTriggered(membershipManagerImpl);
        membershipManagerImpl.poll(this.time.milliseconds());
        verifyReconciliationTriggered(membershipManagerImpl);
        Mockito.clearInvocations(new MembershipManagerImpl[]{membershipManagerImpl});
        Assertions.assertEquals(MemberState.RECONCILING, membershipManagerImpl.state());
        return mockRevocationNoCallbacks;
    }

    private void verifyReconciliationTriggered(MembershipManagerImpl membershipManagerImpl) {
        ((MembershipManagerImpl) Mockito.verify(membershipManagerImpl)).markReconciliationInProgress();
        Assertions.assertEquals(MemberState.RECONCILING, membershipManagerImpl.state());
    }

    private void verifyReconciliationNotTriggered(MembershipManagerImpl membershipManagerImpl) {
        ((MembershipManagerImpl) Mockito.verify(membershipManagerImpl, Mockito.never())).markReconciliationInProgress();
        ((MembershipManagerImpl) Mockito.verify(membershipManagerImpl, Mockito.never())).markReconciliationCompleted();
    }

    private void verifyReconciliationTriggeredAndCompleted(MembershipManagerImpl membershipManagerImpl, List<TopicIdPartition> list) {
        Assertions.assertEquals(MemberState.ACKNOWLEDGING, membershipManagerImpl.state());
        ((MembershipManagerImpl) Mockito.verify(membershipManagerImpl)).markReconciliationInProgress();
        ((MembershipManagerImpl) Mockito.verify(membershipManagerImpl)).markReconciliationCompleted();
        Assertions.assertFalse(membershipManagerImpl.reconciliationInProgress());
        ((SubscriptionState) Mockito.verify(this.subscriptionState)).assignFromSubscribed(new HashSet(buildTopicPartitions(list)));
        Assertions.assertEquals(assignmentByTopicId(list), membershipManagerImpl.currentAssignment());
        ((CommitRequestManager) Mockito.verify(this.commitRequestManager)).resetAutoCommitTimer();
    }

    private List<TopicPartition> buildTopicPartitions(List<TopicIdPartition> list) {
        return (List) list.stream().map((v0) -> {
            return v0.topicPartition();
        }).collect(Collectors.toList());
    }

    private void mockAckSent(MembershipManagerImpl membershipManagerImpl) {
        membershipManagerImpl.onHeartbeatRequestSent();
    }

    private void mockTopicNameInMetadataCache(Map<Uuid, String> map, boolean z) {
        if (z) {
            Mockito.when(this.metadata.topicNames()).thenReturn(map);
        } else {
            Mockito.when(this.metadata.topicNames()).thenReturn(Collections.emptyMap());
        }
    }

    private CompletableFuture<Void> mockRevocationNoCallbacks(boolean z) {
        ((SubscriptionState) Mockito.doNothing().when(this.subscriptionState)).markPendingRevocation(ArgumentMatchers.anySet());
        Mockito.when(this.subscriptionState.rebalanceListener()).thenReturn(Optional.empty()).thenReturn(Optional.empty());
        if (!z) {
            return CompletableFuture.completedFuture(null);
        }
        Mockito.when(Boolean.valueOf(this.commitRequestManager.autoCommitEnabled())).thenReturn(true);
        CompletableFuture<Void> completableFuture = new CompletableFuture<>();
        Mockito.when(this.commitRequestManager.maybeAutoCommitAllConsumedNow((Optional) ArgumentMatchers.any(), ArgumentMatchers.anyBoolean())).thenReturn(completableFuture);
        return completableFuture;
    }

    private void mockMemberHasAutoAssignedPartition() {
        Mockito.when(this.subscriptionState.assignedPartitions()).thenReturn(Collections.singleton(new TopicPartition("topic1", 0)));
        Mockito.when(Boolean.valueOf(this.subscriptionState.hasAutoAssignedPartitions())).thenReturn(true);
        Mockito.when(this.subscriptionState.rebalanceListener()).thenReturn(Optional.empty()).thenReturn(Optional.empty());
    }

    private void testRevocationOfAllPartitionsCompleted(MembershipManagerImpl membershipManagerImpl) {
        testRevocationCompleted(membershipManagerImpl, Collections.emptyList());
    }

    private void testRevocationCompleted(MembershipManagerImpl membershipManagerImpl, List<TopicIdPartition> list) {
        Assertions.assertEquals(MemberState.ACKNOWLEDGING, membershipManagerImpl.state());
        Assertions.assertEquals(assignmentByTopicId(list), membershipManagerImpl.currentAssignment());
        Assertions.assertFalse(membershipManagerImpl.reconciliationInProgress());
        ((SubscriptionState) Mockito.verify(this.subscriptionState)).markPendingRevocation(ArgumentMatchers.anySet());
        ((SubscriptionState) Mockito.verify(this.subscriptionState)).assignFromSubscribed(new HashSet(buildTopicPartitions(list)));
    }

    private Map<Uuid, SortedSet<Integer>> assignmentByTopicId(List<TopicIdPartition> list) {
        HashMap hashMap = new HashMap();
        list.forEach(topicIdPartition -> {
            ((SortedSet) hashMap.computeIfAbsent(topicIdPartition.topicId(), uuid -> {
                return new TreeSet();
            })).add(Integer.valueOf(topicIdPartition.partition()));
        });
        return hashMap;
    }

    private void mockOwnedPartitionAndAssignmentReceived(MembershipManagerImpl membershipManagerImpl, Uuid uuid, String str, Collection<TopicIdPartition> collection) {
        Mockito.when(this.subscriptionState.assignedPartitions()).thenReturn(getTopicPartitions(collection));
        membershipManagerImpl.updateCurrentAssignment(new HashSet(collection));
        Mockito.when(this.metadata.topicNames()).thenReturn(Collections.singletonMap(uuid, str));
        Mockito.when(Boolean.valueOf(this.subscriptionState.hasAutoAssignedPartitions())).thenReturn(true);
        Mockito.when(this.subscriptionState.rebalanceListener()).thenReturn(Optional.empty()).thenReturn(Optional.empty());
    }

    private Set<TopicPartition> getTopicPartitions(Collection<TopicIdPartition> collection) {
        return (Set) collection.stream().map(topicIdPartition -> {
            return new TopicPartition(topicIdPartition.topic(), topicIdPartition.partition());
        }).collect(Collectors.toSet());
    }

    private void mockOwnedPartition(MembershipManagerImpl membershipManagerImpl, Uuid uuid, String str) {
        TopicPartition topicPartition = new TopicPartition(str, 0);
        membershipManagerImpl.updateCurrentAssignment(Collections.singleton(new TopicIdPartition(uuid, new TopicPartition(str, 0))));
        Mockito.when(this.subscriptionState.assignedPartitions()).thenReturn(Collections.singleton(topicPartition));
        Mockito.when(Boolean.valueOf(this.subscriptionState.hasAutoAssignedPartitions())).thenReturn(true);
    }

    private MembershipManagerImpl mockJoinAndReceiveAssignment(boolean z) {
        return mockJoinAndReceiveAssignment(z, createAssignment(z));
    }

    private MembershipManagerImpl mockJoinAndReceiveAssignment(boolean z, ConsumerGroupHeartbeatResponseData.Assignment assignment) {
        MembershipManagerImpl createMembershipManagerJoiningGroup = createMembershipManagerJoiningGroup();
        ConsumerGroupHeartbeatResponse createConsumerGroupHeartbeatResponse = createConsumerGroupHeartbeatResponse(assignment);
        Mockito.when(Boolean.valueOf(this.subscriptionState.hasAutoAssignedPartitions())).thenReturn(true);
        Mockito.when(this.subscriptionState.rebalanceListener()).thenReturn(Optional.empty()).thenReturn(Optional.empty());
        createMembershipManagerJoiningGroup.onHeartbeatResponseReceived(createConsumerGroupHeartbeatResponse.data());
        createMembershipManagerJoiningGroup.poll(this.time.milliseconds());
        if (z) {
            ((SubscriptionState) Mockito.verify(this.subscriptionState)).assignFromSubscribed(ArgumentMatchers.anyCollection());
        } else {
            ((SubscriptionState) Mockito.verify(this.subscriptionState, Mockito.never())).assignFromSubscribed(ArgumentMatchers.anyCollection());
        }
        return createMembershipManagerJoiningGroup;
    }

    private MembershipManagerImpl createMemberInStableState() {
        return createMemberInStableState(null);
    }

    private MembershipManagerImpl createMemberInStableState(String str) {
        MembershipManagerImpl createMembershipManagerJoiningGroup = createMembershipManagerJoiningGroup(str);
        createMembershipManagerJoiningGroup.onHeartbeatResponseReceived(createConsumerGroupHeartbeatResponse(null).data());
        Assertions.assertEquals(MemberState.STABLE, createMembershipManagerJoiningGroup.state());
        return createMembershipManagerJoiningGroup;
    }

    private void receiveAssignment(Map<Uuid, SortedSet<Integer>> map, MembershipManager membershipManager) {
        membershipManager.onHeartbeatResponseReceived(createConsumerGroupHeartbeatResponse(new ConsumerGroupHeartbeatResponseData.Assignment().setTopicPartitions((List) map.entrySet().stream().map(entry -> {
            return new ConsumerGroupHeartbeatResponseData.TopicPartitions().setTopicId((Uuid) entry.getKey()).setPartitions(new ArrayList((Collection) entry.getValue()));
        }).collect(Collectors.toList()))).data());
    }

    private void receiveAssignment(Uuid uuid, List<Integer> list, MembershipManager membershipManager) {
        membershipManager.onHeartbeatResponseReceived(createConsumerGroupHeartbeatResponse(new ConsumerGroupHeartbeatResponseData.Assignment().setTopicPartitions(Collections.singletonList(new ConsumerGroupHeartbeatResponseData.TopicPartitions().setTopicId(uuid).setPartitions(list)))).data());
    }

    private void receiveAssignmentAfterRejoin(Uuid uuid, List<Integer> list, MembershipManager membershipManager) {
        membershipManager.onHeartbeatResponseReceived(createConsumerGroupHeartbeatResponseWithBumpedEpoch(new ConsumerGroupHeartbeatResponseData.Assignment().setTopicPartitions(Collections.singletonList(new ConsumerGroupHeartbeatResponseData.TopicPartitions().setTopicId(uuid).setPartitions(list)))).data());
    }

    private void receiveEmptyAssignment(MembershipManager membershipManager) {
        membershipManager.onHeartbeatResponseReceived(createConsumerGroupHeartbeatResponse(new ConsumerGroupHeartbeatResponseData.Assignment().setTopicPartitions(Collections.emptyList())).data());
    }

    private void testFencedMemberReleasesAssignmentAndTransitionsToJoining(MembershipManager membershipManager) {
        mockMemberHasAutoAssignedPartition();
        membershipManager.transitionToFenced();
        Assertions.assertEquals(MEMBER_ID, membershipManager.memberId());
        Assertions.assertEquals(0, membershipManager.memberEpoch());
        Assertions.assertEquals(MemberState.JOINING, membershipManager.state());
    }

    private void testLeaveGroupReleasesAssignmentAndResetsEpochToSendLeaveGroup(MembershipManager membershipManager) {
        mockLeaveGroup();
        CompletableFuture leaveGroup = membershipManager.leaveGroup();
        Assertions.assertEquals(MemberState.LEAVING, membershipManager.state());
        Assertions.assertFalse(leaveGroup.isDone(), "Leave group result should not complete until the heartbeat request to leave is sent out.");
        membershipManager.onHeartbeatRequestSent();
        Assertions.assertEquals(MemberState.UNSUBSCRIBED, membershipManager.state());
        Assertions.assertTrue(leaveGroup.isDone());
        Assertions.assertFalse(leaveGroup.isCompletedExceptionally());
        Assertions.assertEquals(MEMBER_ID, membershipManager.memberId());
        Assertions.assertEquals(-1, membershipManager.memberEpoch());
        Assertions.assertTrue(membershipManager.currentAssignment().isEmpty());
        ((SubscriptionState) Mockito.verify(this.subscriptionState)).assignFromSubscribed(Collections.emptySet());
    }

    private void mockLeaveGroup() {
        mockMemberHasAutoAssignedPartition();
        ((SubscriptionState) Mockito.doNothing().when(this.subscriptionState)).markPendingRevocation(ArgumentMatchers.anySet());
    }

    private ConsumerRebalanceListenerCallbackCompletedEvent mockPrepareLeavingStuckOnUserCallback(MembershipManagerImpl membershipManagerImpl, ConsumerRebalanceListenerInvoker consumerRebalanceListenerInvoker) {
        TopicPartition topicPartition = new TopicPartition("topic1", 0);
        CounterConsumerRebalanceListener counterConsumerRebalanceListener = new CounterConsumerRebalanceListener();
        Mockito.when(this.subscriptionState.assignedPartitions()).thenReturn(Collections.singleton(topicPartition));
        Mockito.when(Boolean.valueOf(this.subscriptionState.hasAutoAssignedPartitions())).thenReturn(true);
        Mockito.when(this.subscriptionState.rebalanceListener()).thenReturn(Optional.of(counterConsumerRebalanceListener));
        ((SubscriptionState) Mockito.doNothing().when(this.subscriptionState)).markPendingRevocation(ArgumentMatchers.anySet());
        Mockito.when(Boolean.valueOf(this.commitRequestManager.autoCommitEnabled())).thenReturn(false);
        membershipManagerImpl.leaveGroup();
        return performCallback(membershipManagerImpl, consumerRebalanceListenerInvoker, ConsumerRebalanceListenerMethodName.ON_PARTITIONS_REVOKED, topicPartitions(topicPartition.topic(), topicPartition.partition()), false);
    }

    private void testStateUpdateOnFatalFailure(MembershipManagerImpl membershipManagerImpl) {
        String memberId = membershipManagerImpl.memberId();
        int memberEpoch = membershipManagerImpl.memberEpoch();
        Mockito.when(Boolean.valueOf(this.subscriptionState.hasAutoAssignedPartitions())).thenReturn(true);
        membershipManagerImpl.transitionToFatal();
        Assertions.assertEquals(MemberState.FATAL, membershipManagerImpl.state());
        Assertions.assertEquals(memberId, membershipManagerImpl.memberId());
        Assertions.assertEquals(memberEpoch, membershipManagerImpl.memberEpoch());
    }

    private ConsumerGroupHeartbeatResponse createConsumerGroupHeartbeatResponse(ConsumerGroupHeartbeatResponseData.Assignment assignment) {
        return new ConsumerGroupHeartbeatResponse(new ConsumerGroupHeartbeatResponseData().setErrorCode(Errors.NONE.code()).setMemberId(MEMBER_ID).setMemberEpoch(1).setAssignment(assignment));
    }

    private ConsumerGroupHeartbeatResponse createConsumerGroupHeartbeatResponseWithBumpedEpoch(ConsumerGroupHeartbeatResponseData.Assignment assignment) {
        return new ConsumerGroupHeartbeatResponse(new ConsumerGroupHeartbeatResponseData().setErrorCode(Errors.NONE.code()).setMemberId(MEMBER_ID).setMemberEpoch(2).setAssignment(assignment));
    }

    private ConsumerGroupHeartbeatResponse createConsumerGroupHeartbeatResponseWithError() {
        return new ConsumerGroupHeartbeatResponse(new ConsumerGroupHeartbeatResponseData().setErrorCode(Errors.UNKNOWN_MEMBER_ID.code()).setMemberId(MEMBER_ID).setMemberEpoch(5));
    }

    private ConsumerGroupHeartbeatResponseData.Assignment createAssignment(boolean z) {
        Uuid randomUuid = Uuid.randomUuid();
        Uuid randomUuid2 = Uuid.randomUuid();
        if (z) {
            HashMap hashMap = new HashMap();
            hashMap.put(randomUuid, "topic1");
            hashMap.put(randomUuid2, "topic2");
            Mockito.when(this.metadata.topicNames()).thenReturn(hashMap);
        }
        return new ConsumerGroupHeartbeatResponseData.Assignment().setTopicPartitions(Arrays.asList(new ConsumerGroupHeartbeatResponseData.TopicPartitions().setTopicId(randomUuid).setPartitions(Arrays.asList(0, 1, 2)), new ConsumerGroupHeartbeatResponseData.TopicPartitions().setTopicId(randomUuid2).setPartitions(Arrays.asList(3, 4, 5))));
    }

    private MembershipManager memberJoinWithAssignment(String str, Uuid uuid) {
        MembershipManagerImpl mockJoinAndReceiveAssignment = mockJoinAndReceiveAssignment(true);
        mockJoinAndReceiveAssignment.onHeartbeatRequestSent();
        Mockito.when(this.metadata.topicNames()).thenReturn(Collections.singletonMap(uuid, str));
        receiveAssignment(uuid, Collections.singletonList(0), mockJoinAndReceiveAssignment);
        mockJoinAndReceiveAssignment.onHeartbeatRequestSent();
        Assertions.assertFalse(mockJoinAndReceiveAssignment.currentAssignment().isEmpty());
        return mockJoinAndReceiveAssignment;
    }
}
