/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.clients.consumer.internals;

import java.time.Duration;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.function.Supplier;
import org.apache.kafka.clients.consumer.internals.ConsumerHeartbeatRequestManager;
import org.apache.kafka.clients.consumer.internals.ConsumerNetworkThread;
import org.apache.kafka.clients.consumer.internals.CoordinatorRequestManager;
import org.apache.kafka.clients.consumer.internals.NetworkClientDelegate;
import org.apache.kafka.clients.consumer.internals.OffsetsRequestManager;
import org.apache.kafka.clients.consumer.internals.RequestManager;
import org.apache.kafka.clients.consumer.internals.RequestManagers;
import org.apache.kafka.clients.consumer.internals.events.ApplicationEvent;
import org.apache.kafka.clients.consumer.internals.events.ApplicationEventProcessor;
import org.apache.kafka.clients.consumer.internals.events.CompletableEventReaper;
import org.apache.kafka.clients.consumer.internals.events.PollEvent;
import org.apache.kafka.clients.consumer.internals.metrics.AsyncConsumerMetrics;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.test.TestCondition;
import org.apache.kafka.test.TestUtils;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;
import org.mockito.ArgumentMatchers;
import org.mockito.Mockito;
import org.mockito.verification.VerificationMode;

public class ConsumerNetworkThreadTest {
    private final Time time;
    private final BlockingQueue<ApplicationEvent> applicationEventQueue;
    private final ApplicationEventProcessor applicationEventProcessor;
    private final OffsetsRequestManager offsetsRequestManager;
    private final ConsumerHeartbeatRequestManager heartbeatRequestManager;
    private final CoordinatorRequestManager coordinatorRequestManager;
    private final ConsumerNetworkThread consumerNetworkThread;
    private final NetworkClientDelegate networkClientDelegate = (NetworkClientDelegate)Mockito.mock(NetworkClientDelegate.class);
    private final RequestManagers requestManagers = (RequestManagers)Mockito.mock(RequestManagers.class);
    private final CompletableEventReaper applicationEventReaper;
    private final AsyncConsumerMetrics asyncConsumerMetrics;

    ConsumerNetworkThreadTest() {
        this.offsetsRequestManager = (OffsetsRequestManager)Mockito.mock(OffsetsRequestManager.class);
        this.heartbeatRequestManager = (ConsumerHeartbeatRequestManager)Mockito.mock(ConsumerHeartbeatRequestManager.class);
        this.coordinatorRequestManager = (CoordinatorRequestManager)Mockito.mock(CoordinatorRequestManager.class);
        this.applicationEventProcessor = (ApplicationEventProcessor)Mockito.mock(ApplicationEventProcessor.class);
        this.applicationEventReaper = (CompletableEventReaper)Mockito.mock(CompletableEventReaper.class);
        this.time = new MockTime();
        this.applicationEventQueue = new LinkedBlockingQueue<ApplicationEvent>();
        this.asyncConsumerMetrics = (AsyncConsumerMetrics)Mockito.mock(AsyncConsumerMetrics.class);
        LogContext logContext = new LogContext();
        this.consumerNetworkThread = new ConsumerNetworkThread(logContext, this.time, this.applicationEventQueue, this.applicationEventReaper, () -> this.applicationEventProcessor, () -> this.networkClientDelegate, () -> this.requestManagers, this.asyncConsumerMetrics);
    }

    @BeforeEach
    public void setup() {
        this.consumerNetworkThread.initializeResources();
    }

    @AfterEach
    public void tearDown() {
        if (this.consumerNetworkThread != null) {
            this.consumerNetworkThread.close();
        }
    }

    @Test
    public void testEnsureCloseStopsRunningThread() {
        Assertions.assertTrue((boolean)this.consumerNetworkThread.isRunning(), (String)"ConsumerNetworkThread should start running when created");
        this.consumerNetworkThread.close();
        Assertions.assertFalse((boolean)this.consumerNetworkThread.isRunning(), (String)"close() should make consumerNetworkThread.running false by calling closeInternal(Duration timeout)");
    }

    @ParameterizedTest
    @ValueSource(longs={4999L, 5000L, 5001L})
    public void testConsumerNetworkThreadPollTimeComputations(long exampleTime) {
        List<ConsumerHeartbeatRequestManager> list = List.of(this.coordinatorRequestManager, this.heartbeatRequestManager);
        Mockito.when((Object)this.requestManagers.entries()).thenReturn(list);
        NetworkClientDelegate.PollResult pollResult = new NetworkClientDelegate.PollResult(exampleTime);
        NetworkClientDelegate.PollResult pollResult1 = new NetworkClientDelegate.PollResult(exampleTime + 100L);
        long t = this.time.milliseconds();
        Mockito.when((Object)this.coordinatorRequestManager.poll(t)).thenReturn((Object)pollResult);
        Mockito.when((Object)this.coordinatorRequestManager.maximumTimeToWait(t)).thenReturn((Object)exampleTime);
        Mockito.when((Object)this.heartbeatRequestManager.poll(t)).thenReturn((Object)pollResult1);
        Mockito.when((Object)this.heartbeatRequestManager.maximumTimeToWait(t)).thenReturn((Object)(exampleTime + 100L));
        Mockito.when((Object)this.networkClientDelegate.addAll(pollResult)).thenReturn((Object)pollResult.timeUntilNextPollMs);
        Mockito.when((Object)this.networkClientDelegate.addAll(pollResult1)).thenReturn((Object)pollResult1.timeUntilNextPollMs);
        this.consumerNetworkThread.runOnce();
        ((NetworkClientDelegate)Mockito.verify((Object)this.networkClientDelegate)).poll(Math.min(exampleTime, 5000L), this.time.milliseconds());
        Assertions.assertEquals((long)this.consumerNetworkThread.maximumTimeToWait(), (long)exampleTime);
    }

    @Test
    public void testStartupAndTearDown() throws InterruptedException {
        this.consumerNetworkThread.start();
        TestCondition isStarted = () -> ((ConsumerNetworkThread)this.consumerNetworkThread).isRunning();
        TestCondition isClosed = () -> !this.consumerNetworkThread.isRunning() && !this.consumerNetworkThread.isAlive();
        TestUtils.waitForCondition(isStarted, "The consumer network thread did not start within 15000 ms");
        this.consumerNetworkThread.close(Duration.ofMillis(15000L));
        TestUtils.waitForCondition(isClosed, "The consumer network thread did not stop within 15000 ms");
    }

    @Test
    public void testRequestsTransferFromManagersToClientOnThreadRun() {
        List<OffsetsRequestManager> list = List.of(this.coordinatorRequestManager, this.heartbeatRequestManager, this.offsetsRequestManager);
        Mockito.when((Object)this.requestManagers.entries()).thenReturn(list);
        Mockito.when((Object)this.coordinatorRequestManager.poll(ArgumentMatchers.anyLong())).thenReturn((Object)((NetworkClientDelegate.PollResult)Mockito.mock(NetworkClientDelegate.PollResult.class)));
        this.consumerNetworkThread.runOnce();
        this.requestManagers.entries().forEach(rm -> ((RequestManager)Mockito.verify((Object)rm)).poll(ArgumentMatchers.anyLong()));
        this.requestManagers.entries().forEach(rm -> ((RequestManager)Mockito.verify((Object)rm)).maximumTimeToWait(ArgumentMatchers.anyLong()));
        ((NetworkClientDelegate)Mockito.verify((Object)this.networkClientDelegate)).addAll((NetworkClientDelegate.PollResult)ArgumentMatchers.any(NetworkClientDelegate.PollResult.class));
        ((NetworkClientDelegate)Mockito.verify((Object)this.networkClientDelegate)).poll(ArgumentMatchers.anyLong(), ArgumentMatchers.anyLong());
    }

    @Test
    public void testMaximumTimeToWait() {
        int defaultHeartbeatIntervalMs = 1000;
        Assertions.assertEquals((long)5000L, (long)this.consumerNetworkThread.maximumTimeToWait());
        Mockito.when((Object)this.requestManagers.entries()).thenReturn(List.of(this.heartbeatRequestManager));
        Mockito.when((Object)this.heartbeatRequestManager.maximumTimeToWait(this.time.milliseconds())).thenReturn((Object)1000L);
        this.consumerNetworkThread.runOnce();
        Assertions.assertEquals((long)1000L, (long)this.consumerNetworkThread.maximumTimeToWait());
    }

    @Test
    public void testCleanupInvokesReaper() {
        LinkedList queue = new LinkedList();
        Mockito.when((Object)this.networkClientDelegate.unsentRequests()).thenReturn(queue);
        Mockito.when((Object)this.applicationEventReaper.reap(this.applicationEventQueue)).thenReturn((Object)1L);
        this.consumerNetworkThread.cleanup();
        ((CompletableEventReaper)Mockito.verify((Object)this.applicationEventReaper)).reap(this.applicationEventQueue);
        ((AsyncConsumerMetrics)Mockito.verify((Object)this.asyncConsumerMetrics)).recordApplicationEventExpiredSize(1L);
    }

    @Test
    public void testRunOnceInvokesReaper() {
        Mockito.when((Object)this.applicationEventReaper.reap(((Long)ArgumentMatchers.any(Long.class)).longValue())).thenReturn((Object)1L);
        this.consumerNetworkThread.runOnce();
        ((CompletableEventReaper)Mockito.verify((Object)this.applicationEventReaper)).reap(((Long)ArgumentMatchers.any(Long.class)).longValue());
        ((AsyncConsumerMetrics)Mockito.verify((Object)this.asyncConsumerMetrics)).recordApplicationEventExpiredSize(1L);
    }

    @Test
    public void testSendUnsentRequests() {
        Mockito.when((Object)this.networkClientDelegate.hasAnyPendingRequests()).thenReturn((Object)true).thenReturn((Object)true).thenReturn((Object)false);
        this.consumerNetworkThread.cleanup();
        ((NetworkClientDelegate)Mockito.verify((Object)this.networkClientDelegate, (VerificationMode)Mockito.times((int)2))).poll(ArgumentMatchers.anyLong(), ArgumentMatchers.anyLong());
    }

    @Test
    public void testRunOnceRecordTimeBetweenNetworkThreadPoll() {
        try (Metrics metrics = new Metrics();
             AsyncConsumerMetrics asyncConsumerMetrics = new AsyncConsumerMetrics(metrics);
             ConsumerNetworkThread consumerNetworkThread = new ConsumerNetworkThread(new LogContext(), this.time, this.applicationEventQueue, this.applicationEventReaper, () -> this.applicationEventProcessor, () -> this.networkClientDelegate, () -> this.requestManagers, asyncConsumerMetrics);){
            consumerNetworkThread.initializeResources();
            consumerNetworkThread.runOnce();
            this.time.sleep(10L);
            consumerNetworkThread.runOnce();
            Assertions.assertEquals((double)10.0, (double)((Double)metrics.metric(metrics.metricName("time-between-network-thread-poll-avg", "consumer-metrics")).metricValue()));
            Assertions.assertEquals((double)10.0, (double)((Double)metrics.metric(metrics.metricName("time-between-network-thread-poll-max", "consumer-metrics")).metricValue()));
        }
    }

    @Test
    public void testRunOnceRecordApplicationEventQueueSizeAndApplicationEventQueueTime() {
        try (Metrics metrics = new Metrics();
             AsyncConsumerMetrics asyncConsumerMetrics = new AsyncConsumerMetrics(metrics);
             ConsumerNetworkThread consumerNetworkThread = new ConsumerNetworkThread(new LogContext(), this.time, this.applicationEventQueue, this.applicationEventReaper, () -> this.applicationEventProcessor, () -> this.networkClientDelegate, () -> this.requestManagers, asyncConsumerMetrics);){
            consumerNetworkThread.initializeResources();
            PollEvent event = new PollEvent(0L);
            event.setEnqueuedMs(this.time.milliseconds());
            this.applicationEventQueue.add((ApplicationEvent)event);
            asyncConsumerMetrics.recordApplicationEventQueueSize(1);
            this.time.sleep(10L);
            consumerNetworkThread.runOnce();
            Assertions.assertEquals((double)0.0, (double)((Double)metrics.metric(metrics.metricName("application-event-queue-size", "consumer-metrics")).metricValue()));
            Assertions.assertEquals((double)10.0, (double)((Double)metrics.metric(metrics.metricName("application-event-queue-time-avg", "consumer-metrics")).metricValue()));
            Assertions.assertEquals((double)10.0, (double)((Double)metrics.metric(metrics.metricName("application-event-queue-time-max", "consumer-metrics")).metricValue()));
        }
    }

    @Test
    public void testNetworkClientDelegateInitializeResourcesError() {
        Supplier<NetworkClientDelegate> networkClientDelegateSupplier = () -> {
            throw new KafkaException("Injecting NetworkClientDelegate initialization failure");
        };
        Supplier<RequestManagers> requestManagersSupplier = () -> this.requestManagers;
        this.testInitializeResourcesError(networkClientDelegateSupplier, requestManagersSupplier);
    }

    @Test
    public void testRequestManagersInitializeResourcesError() {
        Supplier<NetworkClientDelegate> networkClientDelegateSupplier = () -> this.networkClientDelegate;
        Supplier<RequestManagers> requestManagersSupplier = () -> {
            throw new KafkaException("Injecting RequestManagers initialization failure");
        };
        this.testInitializeResourcesError(networkClientDelegateSupplier, requestManagersSupplier);
    }

    @Test
    public void testNetworkClientDelegateAndRequestManagersInitializeResourcesError() {
        Supplier<NetworkClientDelegate> networkClientDelegateSupplier = () -> {
            throw new KafkaException("Injecting NetworkClientDelegate initialization failure");
        };
        Supplier<RequestManagers> requestManagersSupplier = () -> {
            throw new KafkaException("Injecting RequestManagers initialization failure");
        };
        this.testInitializeResourcesError(networkClientDelegateSupplier, requestManagersSupplier);
    }

    private void testInitializeResourcesError(Supplier<NetworkClientDelegate> networkClientDelegateSupplier, Supplier<RequestManagers> requestManagersSupplier) {
        try (ConsumerNetworkThread thread = new ConsumerNetworkThread(new LogContext(), this.time, this.applicationEventQueue, this.applicationEventReaper, () -> this.applicationEventProcessor, networkClientDelegateSupplier, requestManagersSupplier, this.asyncConsumerMetrics);){
            Assertions.assertThrows(KafkaException.class, () -> ((ConsumerNetworkThread)thread).initializeResources(), (String)"initializeResources should fail because one or more Supplier throws an error on get()");
            Assertions.assertDoesNotThrow(() -> ((ConsumerNetworkThread)thread).cleanup(), (String)"cleanup() should not cause an error because all references are checked before use");
        }
    }
}

