package org.apache.kafka.clients.consumer.internals;

import java.time.Duration;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CompletableFuture;
import org.apache.kafka.clients.MockClient;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.consumer.internals.NetworkClientDelegate;
import org.apache.kafka.clients.consumer.internals.events.ApplicationEvent;
import org.apache.kafka.clients.consumer.internals.events.ApplicationEventProcessor;
import org.apache.kafka.clients.consumer.internals.events.AssignmentChangeEvent;
import org.apache.kafka.clients.consumer.internals.events.AsyncCommitEvent;
import org.apache.kafka.clients.consumer.internals.events.CompletableApplicationEvent;
import org.apache.kafka.clients.consumer.internals.events.CompletableEvent;
import org.apache.kafka.clients.consumer.internals.events.CompletableEventReaper;
import org.apache.kafka.clients.consumer.internals.events.ListOffsetsEvent;
import org.apache.kafka.clients.consumer.internals.events.NewTopicsMetadataUpdateRequestEvent;
import org.apache.kafka.clients.consumer.internals.events.PollEvent;
import org.apache.kafka.clients.consumer.internals.events.ResetPositionsEvent;
import org.apache.kafka.clients.consumer.internals.events.SyncCommitEvent;
import org.apache.kafka.clients.consumer.internals.events.TopicMetadataEvent;
import org.apache.kafka.clients.consumer.internals.events.ValidatePositionsEvent;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.message.FindCoordinatorRequestData;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.requests.FindCoordinatorRequest;
import org.apache.kafka.common.requests.FindCoordinatorResponse;
import org.apache.kafka.common.requests.MetadataResponse;
import org.apache.kafka.common.requests.OffsetCommitRequest;
import org.apache.kafka.common.requests.OffsetCommitResponse;
import org.apache.kafka.common.requests.RequestTestUtils;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.test.TestCondition;
import org.apache.kafka.test.TestUtils;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;
import org.mockito.ArgumentMatchers;
import org.mockito.Mockito;

/* loaded from: input_file:org/apache/kafka/clients/consumer/internals/ConsumerNetworkThreadTest.class */
public class ConsumerNetworkThreadTest {
    private ConsumerTestBuilder testBuilder;
    private Time time;
    private ConsumerMetadata metadata;
    private NetworkClientDelegate networkClient;
    private BlockingQueue<ApplicationEvent> applicationEventsQueue;
    private ApplicationEventProcessor applicationEventProcessor;
    private OffsetsRequestManager offsetsRequestManager;
    private CommitRequestManager commitRequestManager;
    private CoordinatorRequestManager coordinatorRequestManager;
    private ConsumerNetworkThread consumerNetworkThread;
    private final CompletableEventReaper applicationEventReaper = (CompletableEventReaper) Mockito.mock(CompletableEventReaper.class);
    private MockClient client;

    @BeforeEach
    public void setup() {
        this.testBuilder = new ConsumerTestBuilder(ConsumerTestBuilder.createDefaultGroupInformation());
        this.time = this.testBuilder.time;
        this.metadata = this.testBuilder.metadata;
        this.networkClient = this.testBuilder.networkClientDelegate;
        this.client = this.testBuilder.client;
        this.applicationEventsQueue = this.testBuilder.applicationEventQueue;
        this.applicationEventProcessor = this.testBuilder.applicationEventProcessor;
        this.commitRequestManager = this.testBuilder.commitRequestManager.orElseThrow(IllegalStateException::new);
        this.offsetsRequestManager = this.testBuilder.offsetsRequestManager;
        this.coordinatorRequestManager = this.testBuilder.coordinatorRequestManager.orElseThrow(IllegalStateException::new);
        this.consumerNetworkThread = new ConsumerNetworkThread(this.testBuilder.logContext, this.time, this.testBuilder.applicationEventQueue, this.applicationEventReaper, () -> {
            return this.applicationEventProcessor;
        }, () -> {
            return this.testBuilder.networkClientDelegate;
        }, () -> {
            return this.testBuilder.requestManagers;
        });
        this.consumerNetworkThread.initializeResources();
    }

    @AfterEach
    public void tearDown() {
        if (this.testBuilder != null) {
            this.testBuilder.close();
            this.consumerNetworkThread.close(Duration.ZERO);
        }
    }

    @Test
    public void testStartupAndTearDown() throws InterruptedException {
        this.consumerNetworkThread.start();
        TestCondition testCondition = () -> {
            return this.consumerNetworkThread.isRunning();
        };
        TestCondition testCondition2 = () -> {
            return (this.consumerNetworkThread.isRunning() || this.consumerNetworkThread.isAlive()) ? false : true;
        };
        TestUtils.waitForCondition(testCondition, "The consumer network thread did not start within 15000 ms");
        this.consumerNetworkThread.close(Duration.ofMillis(TestUtils.DEFAULT_MAX_WAIT_MS));
        TestUtils.waitForCondition(testCondition2, "The consumer network thread did not stop within 15000 ms");
    }

    @Test
    public void testApplicationEvent() {
        ApplicationEvent pollEvent = new PollEvent(100L);
        this.applicationEventsQueue.add(pollEvent);
        this.consumerNetworkThread.runOnce();
        ((ApplicationEventProcessor) Mockito.verify(this.applicationEventProcessor, Mockito.times(1))).process(pollEvent);
    }

    @Test
    public void testMetadataUpdateEvent() {
        this.applicationEventsQueue.add(new NewTopicsMetadataUpdateRequestEvent());
        this.consumerNetworkThread.runOnce();
        ((ConsumerMetadata) Mockito.verify(this.metadata)).requestUpdateForNewTopics();
    }

    @Test
    public void testAsyncCommitEvent() {
        this.applicationEventsQueue.add(new AsyncCommitEvent(new HashMap()));
        this.consumerNetworkThread.runOnce();
        ((ApplicationEventProcessor) Mockito.verify(this.applicationEventProcessor)).process((ApplicationEvent) ArgumentMatchers.any(AsyncCommitEvent.class));
    }

    @Test
    public void testSyncCommitEvent() {
        this.applicationEventsQueue.add(new SyncCommitEvent(new HashMap(), CompletableEvent.calculateDeadlineMs(this.time, 100L)));
        this.consumerNetworkThread.runOnce();
        ((ApplicationEventProcessor) Mockito.verify(this.applicationEventProcessor)).process((ApplicationEvent) ArgumentMatchers.any(SyncCommitEvent.class));
    }

    @ValueSource(booleans = {true, false})
    @ParameterizedTest
    public void testListOffsetsEventIsProcessed(boolean z) {
        this.applicationEventsQueue.add(new ListOffsetsEvent(Collections.singletonMap(new TopicPartition("topic1", 1), 5L), CompletableEvent.calculateDeadlineMs(this.time, 100L), z));
        this.consumerNetworkThread.runOnce();
        ((ApplicationEventProcessor) Mockito.verify(this.applicationEventProcessor)).process((ApplicationEvent) ArgumentMatchers.any(ListOffsetsEvent.class));
        Assertions.assertTrue(this.applicationEventsQueue.isEmpty());
    }

    @Test
    public void testResetPositionsEventIsProcessed() {
        this.applicationEventsQueue.add(new ResetPositionsEvent(CompletableEvent.calculateDeadlineMs(this.time, 100L)));
        this.consumerNetworkThread.runOnce();
        ((ApplicationEventProcessor) Mockito.verify(this.applicationEventProcessor)).process((ApplicationEvent) ArgumentMatchers.any(ResetPositionsEvent.class));
        Assertions.assertTrue(this.applicationEventsQueue.isEmpty());
    }

    @Test
    public void testResetPositionsProcessFailureIsIgnored() {
        ((OffsetsRequestManager) Mockito.doThrow(new Throwable[]{new NullPointerException()}).when(this.offsetsRequestManager)).resetPositionsIfNeeded();
        this.applicationEventsQueue.add(new ResetPositionsEvent(CompletableEvent.calculateDeadlineMs(this.time, 100L)));
        Assertions.assertDoesNotThrow(() -> {
            this.consumerNetworkThread.runOnce();
        });
        ((ApplicationEventProcessor) Mockito.verify(this.applicationEventProcessor)).process((ApplicationEvent) ArgumentMatchers.any(ResetPositionsEvent.class));
    }

    @Test
    public void testValidatePositionsEventIsProcessed() {
        this.applicationEventsQueue.add(new ValidatePositionsEvent(CompletableEvent.calculateDeadlineMs(this.time, 100L)));
        this.consumerNetworkThread.runOnce();
        ((ApplicationEventProcessor) Mockito.verify(this.applicationEventProcessor)).process((ApplicationEvent) ArgumentMatchers.any(ValidatePositionsEvent.class));
        Assertions.assertTrue(this.applicationEventsQueue.isEmpty());
    }

    @Test
    public void testAssignmentChangeEvent() {
        HashMap<TopicPartition, OffsetAndMetadata> mockTopicPartitionOffset = mockTopicPartitionOffset();
        long milliseconds = this.time.milliseconds();
        this.applicationEventsQueue.add(new AssignmentChangeEvent(mockTopicPartitionOffset, milliseconds));
        this.consumerNetworkThread.runOnce();
        ((ApplicationEventProcessor) Mockito.verify(this.applicationEventProcessor)).process((ApplicationEvent) ArgumentMatchers.any(AssignmentChangeEvent.class));
        ((NetworkClientDelegate) Mockito.verify(this.networkClient, Mockito.times(1))).poll(ArgumentMatchers.anyLong(), ArgumentMatchers.anyLong());
        ((CommitRequestManager) Mockito.verify(this.commitRequestManager, Mockito.times(1))).updateAutoCommitTimer(milliseconds);
        ((CommitRequestManager) Mockito.verify(this.commitRequestManager, Mockito.times(1))).maybeAutoCommitAsync();
    }

    @Test
    void testFetchTopicMetadata() {
        this.applicationEventsQueue.add(new TopicMetadataEvent("topic", Long.MAX_VALUE));
        this.consumerNetworkThread.runOnce();
        ((ApplicationEventProcessor) Mockito.verify(this.applicationEventProcessor)).process((ApplicationEvent) ArgumentMatchers.any(TopicMetadataEvent.class));
    }

    @Test
    void testPollResultTimer() {
        NetworkClientDelegate.UnsentRequest unsentRequest = new NetworkClientDelegate.UnsentRequest(new FindCoordinatorRequest.Builder(new FindCoordinatorRequestData().setKeyType(FindCoordinatorRequest.CoordinatorType.TRANSACTION.id()).setKey("foobar")), Optional.empty());
        unsentRequest.setTimer(this.time, 500L);
        Assertions.assertEquals(10L, this.networkClient.addAll(new NetworkClientDelegate.PollResult(10L, Collections.singletonList(unsentRequest))));
        Assertions.assertEquals(10L, this.networkClient.addAll(new NetworkClientDelegate.PollResult(10L, new ArrayList())));
    }

    @Test
    void testMaximumTimeToWait() {
        Assertions.assertEquals(5000L, this.consumerNetworkThread.maximumTimeToWait());
        this.consumerNetworkThread.runOnce();
        Assertions.assertEquals(1000L, this.consumerNetworkThread.maximumTimeToWait());
    }

    @Test
    void testRequestManagersArePolledOnce() {
        this.consumerNetworkThread.runOnce();
        this.testBuilder.requestManagers.entries().forEach(optional -> {
            optional.ifPresent(requestManager -> {
                ((RequestManager) Mockito.verify(requestManager, Mockito.times(1))).poll(ArgumentMatchers.anyLong());
            });
        });
        this.testBuilder.requestManagers.entries().forEach(optional2 -> {
            optional2.ifPresent(requestManager -> {
                ((RequestManager) Mockito.verify(requestManager, Mockito.times(1))).maximumTimeToWait(ArgumentMatchers.anyLong());
            });
        });
        ((NetworkClientDelegate) Mockito.verify(this.networkClient, Mockito.times(1))).poll(ArgumentMatchers.anyLong(), ArgumentMatchers.anyLong());
    }

    @Test
    void testEnsureMetadataUpdateOnPoll() {
        MetadataResponse metadataUpdateWith = RequestTestUtils.metadataUpdateWith(2, Collections.emptyMap());
        this.client.prepareMetadataUpdate(metadataUpdateWith);
        this.metadata.requestUpdate(false);
        this.consumerNetworkThread.runOnce();
        ((ConsumerMetadata) Mockito.verify(this.metadata, Mockito.times(1))).updateWithCurrentRequestVersion((MetadataResponse) ArgumentMatchers.eq(metadataUpdateWith), ArgumentMatchers.eq(false), ArgumentMatchers.anyLong());
    }

    @Test
    void testEnsureEventsAreCompleted() {
        ((CompletableEventReaper) Mockito.doAnswer(invocationOnMock -> {
            Iterator it = this.applicationEventsQueue.iterator();
            while (it.hasNext()) {
                CompletableEvent completableEvent = (ApplicationEvent) it.next();
                if (completableEvent instanceof CompletableEvent) {
                    completableEvent.future().completeExceptionally(new TimeoutException());
                }
                it.remove();
            }
            return null;
        }).when(this.applicationEventReaper)).reap((Collection) ArgumentMatchers.any(Collection.class));
        Node node = (Node) this.metadata.fetch().nodes().get(0);
        this.coordinatorRequestManager.markCoordinatorUnknown("test", this.time.milliseconds());
        this.client.prepareResponse(FindCoordinatorResponse.prepareResponse(Errors.NONE, "group-id", node));
        prepareOffsetCommitRequest(new HashMap(), Errors.NONE, false);
        ApplicationEvent applicationEvent = (CompletableApplicationEvent) Mockito.spy(new AsyncCommitEvent(Collections.emptyMap()));
        ApplicationEvent asyncCommitEvent = new AsyncCommitEvent(Collections.emptyMap());
        CompletableFuture completableFuture = new CompletableFuture();
        Mockito.when(applicationEvent.future()).thenReturn(completableFuture);
        this.applicationEventsQueue.add(applicationEvent);
        this.applicationEventsQueue.add(asyncCommitEvent);
        Assertions.assertFalse(completableFuture.isDone());
        Assertions.assertFalse(this.applicationEventsQueue.isEmpty());
        this.consumerNetworkThread.cleanup();
        Assertions.assertTrue(completableFuture.isCompletedExceptionally());
        Assertions.assertTrue(this.applicationEventsQueue.isEmpty());
    }

    @Test
    void testCleanupInvokesReaper() {
        this.consumerNetworkThread.cleanup();
        ((CompletableEventReaper) Mockito.verify(this.applicationEventReaper)).reap(this.applicationEventsQueue);
    }

    @Test
    void testRunOnceInvokesReaper() {
        this.consumerNetworkThread.runOnce();
        ((CompletableEventReaper) Mockito.verify(this.applicationEventReaper)).reap(((Long) ArgumentMatchers.any(Long.class)).longValue());
    }

    @Test
    void testSendUnsentRequest() {
        this.networkClient.add(new NetworkClientDelegate.UnsentRequest(new FindCoordinatorRequest.Builder(new FindCoordinatorRequestData().setKeyType(FindCoordinatorRequest.CoordinatorType.TRANSACTION.id()).setKey("group-id")), Optional.empty()));
        Assertions.assertTrue(this.networkClient.hasAnyPendingRequests());
        Assertions.assertFalse(this.networkClient.unsentRequests().isEmpty());
        Assertions.assertFalse(this.client.hasInFlightRequests());
        this.consumerNetworkThread.cleanup();
        Assertions.assertTrue(this.networkClient.unsentRequests().isEmpty());
        Assertions.assertFalse(this.client.hasInFlightRequests());
        Assertions.assertFalse(this.networkClient.hasAnyPendingRequests());
    }

    private void prepareOffsetCommitRequest(Map<TopicPartition, Long> map, Errors errors, boolean z) {
        this.client.prepareResponse(offsetCommitRequestMatcher(map), offsetCommitResponse(partitionErrors(map.keySet(), errors)), z);
    }

    private Map<TopicPartition, Errors> partitionErrors(Collection<TopicPartition> collection, Errors errors) {
        HashMap hashMap = new HashMap();
        Iterator<TopicPartition> it = collection.iterator();
        while (it.hasNext()) {
            hashMap.put(it.next(), errors);
        }
        return hashMap;
    }

    private OffsetCommitResponse offsetCommitResponse(Map<TopicPartition, Errors> map) {
        return new OffsetCommitResponse(map);
    }

    private MockClient.RequestMatcher offsetCommitRequestMatcher(Map<TopicPartition, Long> map) {
        return abstractRequest -> {
            Map offsets = ((OffsetCommitRequest) abstractRequest).offsets();
            if (offsets.size() != map.size()) {
                return false;
            }
            for (Map.Entry entry : map.entrySet()) {
                if (!offsets.containsKey(entry.getKey()) || !((Long) offsets.get(entry.getKey())).equals(entry.getValue())) {
                    return false;
                }
            }
            return true;
        };
    }

    private HashMap<TopicPartition, OffsetAndMetadata> mockTopicPartitionOffset() {
        TopicPartition topicPartition = new TopicPartition("t0", 2);
        TopicPartition topicPartition2 = new TopicPartition("t0", 3);
        HashMap<TopicPartition, OffsetAndMetadata> hashMap = new HashMap<>();
        hashMap.put(topicPartition, new OffsetAndMetadata(10L));
        hashMap.put(topicPartition2, new OffsetAndMetadata(20L));
        return hashMap;
    }
}
