package org.apache.kafka.clients.consumer.internals;

import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.Optional;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.stream.Stream;
import org.apache.kafka.clients.consumer.internals.NetworkClientDelegate;
import org.apache.kafka.clients.consumer.internals.events.ApplicationEvent;
import org.apache.kafka.clients.consumer.internals.events.ApplicationEventProcessor;
import org.apache.kafka.clients.consumer.internals.events.AssignmentChangeEvent;
import org.apache.kafka.clients.consumer.internals.events.AsyncCommitEvent;
import org.apache.kafka.clients.consumer.internals.events.CompletableEvent;
import org.apache.kafka.clients.consumer.internals.events.CompletableEventReaper;
import org.apache.kafka.clients.consumer.internals.events.ListOffsetsEvent;
import org.apache.kafka.clients.consumer.internals.events.NewTopicsMetadataUpdateRequestEvent;
import org.apache.kafka.clients.consumer.internals.events.PollEvent;
import org.apache.kafka.clients.consumer.internals.events.ResetPositionsEvent;
import org.apache.kafka.clients.consumer.internals.events.SyncCommitEvent;
import org.apache.kafka.clients.consumer.internals.events.TopicMetadataEvent;
import org.apache.kafka.clients.consumer.internals.events.ValidatePositionsEvent;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.test.TestCondition;
import org.apache.kafka.test.TestUtils;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;
import org.junit.jupiter.params.provider.ValueSource;
import org.mockito.ArgumentMatchers;
import org.mockito.Mockito;

/* loaded from: input_file:org/apache/kafka/clients/consumer/internals/ConsumerNetworkThreadTest.class */
public class ConsumerNetworkThreadTest {
    private final NetworkClientDelegate networkClientDelegate = (NetworkClientDelegate) Mockito.mock(NetworkClientDelegate.class);
    private final RequestManagers requestManagers = (RequestManagers) Mockito.mock(RequestManagers.class);
    private final OffsetsRequestManager offsetsRequestManager = (OffsetsRequestManager) Mockito.mock(OffsetsRequestManager.class);
    private final HeartbeatRequestManager heartbeatRequestManager = (HeartbeatRequestManager) Mockito.mock(HeartbeatRequestManager.class);
    private final CoordinatorRequestManager coordinatorRequestManager = (CoordinatorRequestManager) Mockito.mock(CoordinatorRequestManager.class);
    private final ApplicationEventProcessor applicationEventProcessor = (ApplicationEventProcessor) Mockito.mock(ApplicationEventProcessor.class);
    private final CompletableEventReaper applicationEventReaper = (CompletableEventReaper) Mockito.mock(CompletableEventReaper.class);
    private final Time time = new MockTime();
    private final BlockingQueue<ApplicationEvent> applicationEventsQueue = new LinkedBlockingQueue();
    private final ConsumerNetworkThread consumerNetworkThread = new ConsumerNetworkThread(new LogContext(), this.time, this.applicationEventsQueue, this.applicationEventReaper, () -> {
        return this.applicationEventProcessor;
    }, () -> {
        return this.networkClientDelegate;
    }, () -> {
        return this.requestManagers;
    });

    ConsumerNetworkThreadTest() {
    }

    @BeforeEach
    public void setup() {
        this.consumerNetworkThread.initializeResources();
    }

    @AfterEach
    public void tearDown() {
        if (this.consumerNetworkThread != null) {
            this.consumerNetworkThread.close();
        }
    }

    @Test
    public void testEnsureCloseStopsRunningThread() {
        Assertions.assertTrue(this.consumerNetworkThread.isRunning(), "ConsumerNetworkThread should start running when created");
        this.consumerNetworkThread.close();
        Assertions.assertFalse(this.consumerNetworkThread.isRunning(), "close() should make consumerNetworkThread.running false by calling closeInternal(Duration timeout)");
    }

    @ValueSource(longs = {4999, 5000, 5001})
    @ParameterizedTest
    public void testConsumerNetworkThreadPollTimeComputations(long j) {
        ArrayList arrayList = new ArrayList();
        arrayList.add(Optional.of(this.coordinatorRequestManager));
        arrayList.add(Optional.of(this.heartbeatRequestManager));
        Mockito.when(this.requestManagers.entries()).thenReturn(arrayList);
        NetworkClientDelegate.PollResult pollResult = new NetworkClientDelegate.PollResult(j);
        NetworkClientDelegate.PollResult pollResult2 = new NetworkClientDelegate.PollResult(j + 100);
        long milliseconds = this.time.milliseconds();
        Mockito.when(this.coordinatorRequestManager.poll(milliseconds)).thenReturn(pollResult);
        Mockito.when(Long.valueOf(this.coordinatorRequestManager.maximumTimeToWait(milliseconds))).thenReturn(Long.valueOf(j));
        Mockito.when(this.heartbeatRequestManager.poll(milliseconds)).thenReturn(pollResult2);
        Mockito.when(Long.valueOf(this.heartbeatRequestManager.maximumTimeToWait(milliseconds))).thenReturn(Long.valueOf(j + 100));
        Mockito.when(Long.valueOf(this.networkClientDelegate.addAll(pollResult))).thenReturn(Long.valueOf(pollResult.timeUntilNextPollMs));
        Mockito.when(Long.valueOf(this.networkClientDelegate.addAll(pollResult2))).thenReturn(Long.valueOf(pollResult2.timeUntilNextPollMs));
        this.consumerNetworkThread.runOnce();
        ((NetworkClientDelegate) Mockito.verify(this.networkClientDelegate)).poll(Math.min(j, 5000L), this.time.milliseconds());
        Assertions.assertEquals(this.consumerNetworkThread.maximumTimeToWait(), j);
    }

    @Test
    public void testStartupAndTearDown() throws InterruptedException {
        this.consumerNetworkThread.start();
        ConsumerNetworkThread consumerNetworkThread = this.consumerNetworkThread;
        consumerNetworkThread.getClass();
        TestCondition testCondition = consumerNetworkThread::isRunning;
        TestCondition testCondition2 = () -> {
            return (this.consumerNetworkThread.isRunning() || this.consumerNetworkThread.isAlive()) ? false : true;
        };
        TestUtils.waitForCondition(testCondition, "The consumer network thread did not start within 15000 ms");
        this.consumerNetworkThread.close(Duration.ofMillis(TestUtils.DEFAULT_MAX_WAIT_MS));
        TestUtils.waitForCondition(testCondition2, "The consumer network thread did not stop within 15000 ms");
    }

    @Test
    public void testRequestsTransferFromManagersToClientOnThreadRun() {
        ArrayList arrayList = new ArrayList();
        arrayList.add(Optional.of(this.coordinatorRequestManager));
        arrayList.add(Optional.of(this.heartbeatRequestManager));
        arrayList.add(Optional.of(this.offsetsRequestManager));
        Mockito.when(this.requestManagers.entries()).thenReturn(arrayList);
        Mockito.when(this.coordinatorRequestManager.poll(ArgumentMatchers.anyLong())).thenReturn(Mockito.mock(NetworkClientDelegate.PollResult.class));
        this.consumerNetworkThread.runOnce();
        this.requestManagers.entries().forEach(optional -> {
            optional.ifPresent(requestManager -> {
                ((RequestManager) Mockito.verify(requestManager)).poll(ArgumentMatchers.anyLong());
            });
        });
        this.requestManagers.entries().forEach(optional2 -> {
            optional2.ifPresent(requestManager -> {
                ((RequestManager) Mockito.verify(requestManager)).maximumTimeToWait(ArgumentMatchers.anyLong());
            });
        });
        ((NetworkClientDelegate) Mockito.verify(this.networkClientDelegate)).addAll((NetworkClientDelegate.PollResult) ArgumentMatchers.any(NetworkClientDelegate.PollResult.class));
        ((NetworkClientDelegate) Mockito.verify(this.networkClientDelegate)).poll(ArgumentMatchers.anyLong(), ArgumentMatchers.anyLong());
    }

    @MethodSource({"applicationEvents"})
    @ParameterizedTest
    public void testApplicationEventIsProcessed(ApplicationEvent applicationEvent) {
        this.applicationEventsQueue.add(applicationEvent);
        this.consumerNetworkThread.runOnce();
        if (applicationEvent instanceof CompletableEvent) {
            ((CompletableEventReaper) Mockito.verify(this.applicationEventReaper)).add((CompletableEvent) applicationEvent);
        }
        ((ApplicationEventProcessor) Mockito.verify(this.applicationEventProcessor)).process((ApplicationEvent) ArgumentMatchers.any(applicationEvent.getClass()));
        Assertions.assertTrue(this.applicationEventsQueue.isEmpty());
    }

    @ValueSource(booleans = {true, false})
    @ParameterizedTest
    public void testListOffsetsEventIsProcessed(boolean z) {
        this.applicationEventsQueue.add(new ListOffsetsEvent(Collections.singletonMap(new TopicPartition("topic1", 1), 5L), CompletableEvent.calculateDeadlineMs(this.time, 100L), z));
        this.consumerNetworkThread.runOnce();
        ((ApplicationEventProcessor) Mockito.verify(this.applicationEventProcessor)).process((ApplicationEvent) ArgumentMatchers.any(ListOffsetsEvent.class));
        Assertions.assertTrue(this.applicationEventsQueue.isEmpty());
    }

    @Test
    public void testResetPositionsProcessFailureIsIgnored() {
        ((OffsetsRequestManager) Mockito.doThrow(new Throwable[]{new NullPointerException()}).when(this.offsetsRequestManager)).resetPositionsIfNeeded();
        this.applicationEventsQueue.add(new ResetPositionsEvent(CompletableEvent.calculateDeadlineMs(this.time, 100L)));
        Assertions.assertDoesNotThrow(() -> {
            this.consumerNetworkThread.runOnce();
        });
        ((ApplicationEventProcessor) Mockito.verify(this.applicationEventProcessor)).process((ApplicationEvent) ArgumentMatchers.any(ResetPositionsEvent.class));
    }

    @Test
    public void testMaximumTimeToWait() {
        Assertions.assertEquals(5000L, this.consumerNetworkThread.maximumTimeToWait());
        Mockito.when(this.requestManagers.entries()).thenReturn(Collections.singletonList(Optional.of(this.heartbeatRequestManager)));
        Mockito.when(Long.valueOf(this.heartbeatRequestManager.maximumTimeToWait(this.time.milliseconds()))).thenReturn(1000L);
        this.consumerNetworkThread.runOnce();
        Assertions.assertEquals(1000L, this.consumerNetworkThread.maximumTimeToWait());
    }

    @Test
    public void testCleanupInvokesReaper() {
        Mockito.when(this.networkClientDelegate.unsentRequests()).thenReturn(new LinkedList());
        this.consumerNetworkThread.cleanup();
        ((CompletableEventReaper) Mockito.verify(this.applicationEventReaper)).reap(this.applicationEventsQueue);
    }

    @Test
    public void testRunOnceInvokesReaper() {
        this.consumerNetworkThread.runOnce();
        ((CompletableEventReaper) Mockito.verify(this.applicationEventReaper)).reap(((Long) ArgumentMatchers.any(Long.class)).longValue());
    }

    @Test
    public void testSendUnsentRequests() {
        Mockito.when(Boolean.valueOf(this.networkClientDelegate.hasAnyPendingRequests())).thenReturn(true).thenReturn(true).thenReturn(false);
        this.consumerNetworkThread.cleanup();
        ((NetworkClientDelegate) Mockito.verify(this.networkClientDelegate, Mockito.times(2))).poll(ArgumentMatchers.anyLong(), ArgumentMatchers.anyLong());
    }

    private static Stream<Arguments> applicationEvents() {
        return Stream.of((Object[]) new Arguments[]{Arguments.of(new Object[]{new PollEvent(100L)}), Arguments.of(new Object[]{new NewTopicsMetadataUpdateRequestEvent()}), Arguments.of(new Object[]{new AsyncCommitEvent(new HashMap())}), Arguments.of(new Object[]{new SyncCommitEvent(new HashMap(), 500L)}), Arguments.of(new Object[]{new ResetPositionsEvent(500L)}), Arguments.of(new Object[]{new ValidatePositionsEvent(500L)}), Arguments.of(new Object[]{new TopicMetadataEvent("topic", Long.MAX_VALUE)}), Arguments.of(new Object[]{new AssignmentChangeEvent(new HashMap(), 12345L)})});
    }
}
