package kafka.utils;

import java.io.File;
import java.nio.file.Path;
import java.util.Properties;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import kafka.log.LoadedLogOffsets;
import kafka.log.LocalLog;
import kafka.log.LocalLog$;
import kafka.log.LogConfig;
import kafka.log.LogConfig$;
import kafka.log.LogLoader;
import kafka.log.LogLoader$;
import kafka.log.LogManager$;
import kafka.log.LogSegments;
import kafka.log.ProducerStateManager;
import kafka.log.UnifiedLog;
import kafka.log.UnifiedLog$;
import kafka.server.BrokerTopicStats;
import kafka.server.LogDirFailureChannel;
import org.apache.kafka.common.TopicPartition;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;
import scala.None$;
import scala.Option;
import scala.collection.immutable.List;
import scala.math.package$;
import scala.reflect.ScalaSignature;
import scala.runtime.BoxedUnit;
import scala.runtime.ScalaRunTime$;

/* compiled from: SchedulerTest.scala */
@ScalaSignature(bytes = "\u0006\u0005I4A\u0001F\u000b\u00015!)\u0011\u0005\u0001C\u0001E!9Q\u0005\u0001b\u0001\n\u00031\u0003B\u0002\u0016\u0001A\u0003%q\u0005C\u0004,\u0001\t\u0007I\u0011\u0001\u0017\t\rA\u0002\u0001\u0015!\u0003.\u0011\u001d\t\u0004A1A\u0005\u0002IBaa\u0010\u0001!\u0002\u0013\u0019\u0004b\u0002!\u0001\u0005\u0004%\tA\r\u0005\u0007\u0003\u0002\u0001\u000b\u0011B\u001a\t\u000b\t\u0003A\u0011A\"\t\u000bQ\u0003A\u0011A\"\t\u000be\u0003A\u0011A\"\t\u000by\u0003A\u0011A\"\t\u000b\u0001\u0004A\u0011A\"\t\u000b\t\u0004A\u0011A\"\t\u000b\u0011\u0004A\u0011A\"\t\u000b\u0019\u0004A\u0011A\"\t\u000b!\u0004A\u0011A\"\t\u000b)\u0004A\u0011A\"\u0003\u001bM\u001b\u0007.\u001a3vY\u0016\u0014H+Z:u\u0015\t1r#A\u0003vi&d7OC\u0001\u0019\u0003\u0015Y\u0017MZ6b\u0007\u0001\u0019\"\u0001A\u000e\u0011\u0005qyR\"A\u000f\u000b\u0003y\tQa]2bY\u0006L!\u0001I\u000f\u0003\r\u0005s\u0017PU3g\u0003\u0019a\u0014N\\5u}Q\t1\u0005\u0005\u0002%\u00015\tQ#A\u0005tG\",G-\u001e7feV\tq\u0005\u0005\u0002%Q%\u0011\u0011&\u0006\u0002\u000f\u0017\u000647.Y*dQ\u0016$W\u000f\\3s\u0003)\u00198\r[3ek2,'\u000fI\u0001\t[>\u001c7\u000eV5nKV\tQ\u0006\u0005\u0002%]%\u0011q&\u0006\u0002\t\u001b>\u001c7\u000eV5nK\u0006IQn\\2l)&lW\rI\u0001\tG>,h\u000e^3scU\t1\u0007\u0005\u00025{5\tQG\u0003\u00027o\u00051\u0011\r^8nS\u000eT!\u0001O\u001d\u0002\u0015\r|gnY;se\u0016tGO\u0003\u0002;w\u0005!Q\u000f^5m\u0015\u0005a\u0014\u0001\u00026bm\u0006L!AP\u001b\u0003\u001b\u0005#x.\\5d\u0013:$XmZ3s\u0003%\u0019w.\u001e8uKJ\f\u0004%\u0001\u0005d_VtG/\u001a:3\u0003%\u0019w.\u001e8uKJ\u0014\u0004%A\u0003tKR,\b\u000fF\u0001E!\taR)\u0003\u0002G;\t!QK\\5uQ\tQ\u0001\n\u0005\u0002J%6\t!J\u0003\u0002L\u0019\u0006\u0019\u0011\r]5\u000b\u00055s\u0015a\u00026va&$XM\u001d\u0006\u0003\u001fB\u000bQA[;oSRT\u0011!U\u0001\u0004_J<\u0017BA*K\u0005)\u0011UMZ8sK\u0016\u000b7\r[\u0001\ti\u0016\f'\u000fZ8x]\"\u00121B\u0016\t\u0003\u0013^K!\u0001\u0017&\u0003\u0013\u00053G/\u001a:FC\u000eD\u0017\u0001\t;fgRlunY6TG\",G-\u001e7fe:{g\u000eU3sS>$\u0017n\u0019+bg.D#\u0001D.\u0011\u0005%c\u0016BA/K\u0005\u0011!Vm\u001d;\u0002;Q,7\u000f^'pG.\u001c6\r[3ek2,'\u000fU3sS>$\u0017n\u0019+bg.D#!D.\u0002AQ,7\u000f\u001e*fK:$(/\u00198u)\u0006\u001c8.\u00138N_\u000e\\7k\u00195fIVdWM\u001d\u0015\u0003\u001dm\u000b1\u0003^3ti:{g\u000eU3sS>$\u0017n\u0019+bg.D#aD.\u0002!Q,7\u000f\u001e)fe&|G-[2UCN\\\u0007F\u0001\t\\\u0003-!Xm\u001d;SKN$\u0018M\u001d;)\u0005EY\u0016A\u0007;fgR,fn]2iK\u0012,H.\u001a)s_\u0012,8-\u001a:UCN\\\u0007F\u0001\n\\\u0003a!Xm\u001d;N_\u000e\\7k\u00195fIVdWM\u001d'pG.Lgn\u001a\u0015\u0003'mCCaE7qcB\u0011\u0011J\\\u0005\u0003_*\u0013q\u0001V5nK>,H/A\u0003wC2,XMH\u0001\u0010\u0001")
/* loaded from: input_file:kafka/utils/SchedulerTest.class */
public class SchedulerTest {
    private final KafkaScheduler scheduler;
    private final MockTime mockTime;
    private final AtomicInteger counter1;
    private final AtomicInteger counter2;

    public KafkaScheduler scheduler() {
        return this.scheduler;
    }

    public MockTime mockTime() {
        return this.mockTime;
    }

    public AtomicInteger counter1() {
        return this.counter1;
    }

    public AtomicInteger counter2() {
        return this.counter2;
    }

    @BeforeEach
    public void setup() {
        scheduler().startup();
    }

    @AfterEach
    public void teardown() {
        scheduler().shutdown();
    }

    @Test
    public void testMockSchedulerNonPeriodicTask() {
        mockTime().scheduler().schedule("test1", () -> {
            this.counter1().getAndIncrement();
        }, 1L, mockTime().scheduler().schedule$default$4(), mockTime().scheduler().schedule$default$5());
        mockTime().scheduler().schedule("test2", () -> {
            this.counter2().getAndIncrement();
        }, 100L, mockTime().scheduler().schedule$default$4(), mockTime().scheduler().schedule$default$5());
        Assertions.assertEquals(0, counter1().get(), "Counter1 should not be incremented prior to task running.");
        Assertions.assertEquals(0, counter2().get(), "Counter2 should not be incremented prior to task running.");
        mockTime().sleep(1L);
        Assertions.assertEquals(1, counter1().get(), "Counter1 should be incremented");
        Assertions.assertEquals(0, counter2().get(), "Counter2 should not be incremented");
        mockTime().sleep(100000L);
        Assertions.assertEquals(1, counter1().get(), "More sleeping should not result in more incrementing on counter1.");
        Assertions.assertEquals(1, counter2().get(), "Counter2 should now be incremented.");
    }

    @Test
    public void testMockSchedulerPeriodicTask() {
        mockTime().scheduler().schedule("test1", () -> {
            this.counter1().getAndIncrement();
        }, 1L, 1L, mockTime().scheduler().schedule$default$5());
        mockTime().scheduler().schedule("test2", () -> {
            this.counter2().getAndIncrement();
        }, 100L, 100L, mockTime().scheduler().schedule$default$5());
        Assertions.assertEquals(0, counter1().get(), "Counter1 should not be incremented prior to task running.");
        Assertions.assertEquals(0, counter2().get(), "Counter2 should not be incremented prior to task running.");
        mockTime().sleep(1L);
        Assertions.assertEquals(1, counter1().get(), "Counter1 should be incremented");
        Assertions.assertEquals(0, counter2().get(), "Counter2 should not be incremented");
        mockTime().sleep(100L);
        Assertions.assertEquals(101, counter1().get(), "Counter1 should be incremented 101 times");
        Assertions.assertEquals(1, counter2().get(), "Counter2 should not be incremented once");
    }

    @Test
    public void testReentrantTaskInMockScheduler() {
        mockTime().scheduler().schedule("test1", () -> {
            this.mockTime().scheduler().schedule("test2", () -> {
                this.counter2().getAndIncrement();
            }, 0L, this.mockTime().scheduler().schedule$default$4(), this.mockTime().scheduler().schedule$default$5());
        }, 1L, mockTime().scheduler().schedule$default$4(), mockTime().scheduler().schedule$default$5());
        mockTime().sleep(1L);
        Assertions.assertEquals(1, counter2().get());
    }

    @Test
    public void testNonPeriodicTask() {
        scheduler().schedule("test", () -> {
            this.counter1().getAndIncrement();
        }, 0L, scheduler().schedule$default$4(), scheduler().schedule$default$5());
        TestUtils$ testUtils$ = TestUtils$.MODULE$;
        long j = 1;
        long currentTimeMillis = System.currentTimeMillis();
        while (true) {
            try {
                $anonfun$testNonPeriodicTask$2(this);
                Thread.sleep(5L);
                Assertions.assertEquals(1, counter1().get(), "Should only run once");
                return;
            } catch (AssertionError e) {
                if (System.currentTimeMillis() - currentTimeMillis > 30000) {
                    throw e;
                }
                if (testUtils$.logger().underlying().isInfoEnabled()) {
                    testUtils$.logger().underlying().info(Logging.msgWithLogIdent$(testUtils$, new StringBuilder(49).append("Attempt failed, sleeping for ").append(j).append(", and then retrying.").toString()));
                }
                Thread.sleep(j);
                j += package$.MODULE$.min(j, 1000L);
            }
        }
    }

    @Test
    public void testPeriodicTask() {
        scheduler().schedule("test", () -> {
            this.counter1().getAndIncrement();
        }, 0L, 5L, scheduler().schedule$default$5());
        TestUtils$ testUtils$ = TestUtils$.MODULE$;
        long j = 1;
        long currentTimeMillis = System.currentTimeMillis();
        while (true) {
            try {
                $anonfun$testPeriodicTask$2(this);
                return;
            } catch (AssertionError e) {
                if (System.currentTimeMillis() - currentTimeMillis > 30000) {
                    throw e;
                }
                if (testUtils$.logger().underlying().isInfoEnabled()) {
                    testUtils$.logger().underlying().info(Logging.msgWithLogIdent$(testUtils$, new StringBuilder(49).append("Attempt failed, sleeping for ").append(j).append(", and then retrying.").toString()));
                }
                Thread.sleep(j);
                j += package$.MODULE$.min(j, 1000L);
            }
        }
    }

    @Test
    public void testRestart() {
        mockTime().scheduler().schedule("test1", () -> {
            this.counter1().getAndIncrement();
        }, 1L, mockTime().scheduler().schedule$default$4(), mockTime().scheduler().schedule$default$5());
        mockTime().sleep(1L);
        Assertions.assertEquals(1, counter1().get());
        mockTime().scheduler().shutdown();
        mockTime().scheduler().startup();
        mockTime().scheduler().schedule("test1", () -> {
            this.counter1().getAndIncrement();
        }, 1L, mockTime().scheduler().schedule$default$4(), mockTime().scheduler().schedule$default$5());
        mockTime().sleep(1L);
        Assertions.assertEquals(2, counter1().get());
    }

    @Test
    public void testUnscheduleProducerTask() {
        TestUtils$ testUtils$ = TestUtils$.MODULE$;
        File randomPartitionLogDir = TestUtils$.MODULE$.randomPartitionLogDir(org.apache.kafka.test.TestUtils.tempDirectory((Path) null, (String) null));
        LogConfig logConfig = new LogConfig(new Properties(), LogConfig$.MODULE$.apply$default$2());
        BrokerTopicStats brokerTopicStats = new BrokerTopicStats();
        UnifiedLog$ unifiedLog$ = UnifiedLog$.MODULE$;
        TopicPartition parseTopicPartitionName = LocalLog$.MODULE$.parseTopicPartitionName(randomPartitionLogDir);
        LogDirFailureChannel logDirFailureChannel = new LogDirFailureChannel(10);
        LogSegments logSegments = new LogSegments(parseTopicPartitionName);
        Option maybeCreateLeaderEpochCache = UnifiedLog$.MODULE$.maybeCreateLeaderEpochCache(randomPartitionLogDir, parseTopicPartitionName, logDirFailureChannel, logConfig.recordVersion(), "");
        ProducerStateManager producerStateManager = new ProducerStateManager(parseTopicPartitionName, randomPartitionLogDir, 300000, 3600000, mockTime());
        KafkaScheduler scheduler = scheduler();
        MockTime mockTime = mockTime();
        LogLoader$ logLoader$ = LogLoader$.MODULE$;
        LoadedLogOffsets load = new LogLoader(randomPartitionLogDir, parseTopicPartitionName, logConfig, scheduler, mockTime, logDirFailureChannel, true, logSegments, 0L, 0L, maybeCreateLeaderEpochCache, producerStateManager, new ConcurrentHashMap()).load();
        UnifiedLog unifiedLog = new UnifiedLog(load.logStartOffset(), new LocalLog(randomPartitionLogDir, logConfig, logSegments, load.recoveryPoint(), load.nextOffsetMetadata(), scheduler(), mockTime(), parseTopicPartitionName, logDirFailureChannel), brokerTopicStats, LogManager$.MODULE$.ProducerIdExpirationCheckIntervalMs(), maybeCreateLeaderEpochCache, producerStateManager, None$.MODULE$, true);
        Assertions.assertTrue(scheduler().taskRunning(unifiedLog.producerExpireCheck()));
        unifiedLog.close();
        Assertions.assertFalse(scheduler().taskRunning(unifiedLog.producerExpireCheck()));
    }

    @Timeout(15)
    @Test
    public void testMockSchedulerLocking() {
        CountDownLatch countDownLatch = new CountDownLatch(1);
        CountDownLatch countDownLatch2 = new CountDownLatch(2);
        List list = (List) scala.package$.MODULE$.List().apply(ScalaRunTime$.MODULE$.wrapRefArray(new CountDownLatch[]{new CountDownLatch(1), new CountDownLatch(1)}));
        mockTime().scheduler().schedule("test1", () -> {
            scheduledTask$1((CountDownLatch) list.head(), countDownLatch, countDownLatch2);
        }, 1L, mockTime().scheduler().schedule$default$4(), mockTime().scheduler().schedule$default$5());
        ScheduledExecutorService newSingleThreadScheduledExecutor = Executors.newSingleThreadScheduledExecutor();
        try {
            newSingleThreadScheduledExecutor.scheduleWithFixedDelay(() -> {
                this.mockTime().sleep(1L);
            }, 0L, 1L, TimeUnit.MILLISECONDS);
            Assertions.assertTrue(countDownLatch.await(10L, TimeUnit.SECONDS));
            mockTime().scheduler().schedule("test2", () -> {
                scheduledTask$1((CountDownLatch) list.apply(1), countDownLatch, countDownLatch2);
            }, 1L, mockTime().scheduler().schedule$default$4(), mockTime().scheduler().schedule$default$5());
            list.foreach(countDownLatch3 -> {
                countDownLatch3.countDown();
                return BoxedUnit.UNIT;
            });
            Assertions.assertTrue(countDownLatch2.await(10L, TimeUnit.SECONDS), "Tasks did not complete");
        } finally {
            newSingleThreadScheduledExecutor.shutdownNow();
        }
    }

    public static final /* synthetic */ void $anonfun$testNonPeriodicTask$2(SchedulerTest schedulerTest) {
        Assertions.assertEquals(schedulerTest.counter1().get(), 1);
    }

    public static final /* synthetic */ void $anonfun$testPeriodicTask$2(SchedulerTest schedulerTest) {
        Assertions.assertTrue(schedulerTest.counter1().get() >= 20, "Should count to 20");
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static final void scheduledTask$1(CountDownLatch countDownLatch, CountDownLatch countDownLatch2, CountDownLatch countDownLatch3) {
        countDownLatch2.countDown();
        Assertions.assertTrue(countDownLatch.await(30L, TimeUnit.SECONDS), "Timed out waiting for latch");
        countDownLatch3.countDown();
    }

    public SchedulerTest() {
        KafkaScheduler$ kafkaScheduler$ = KafkaScheduler$.MODULE$;
        KafkaScheduler$ kafkaScheduler$2 = KafkaScheduler$.MODULE$;
        this.scheduler = new KafkaScheduler(1, "kafka-scheduler-", true);
        this.mockTime = new MockTime();
        this.counter1 = new AtomicInteger(0);
        this.counter2 = new AtomicInteger(0);
    }
}
