/*
 * Decompiled with CFR 0.152.
 */
package kafka.utils;

import java.io.File;
import java.io.Serializable;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import kafka.log.LoadedLogOffsets;
import kafka.log.LocalLog;
import kafka.log.LocalLog$;
import kafka.log.LogConfig;
import kafka.log.LogConfig$;
import kafka.log.LogLoader;
import kafka.log.LogManager$;
import kafka.log.LogOffsetsListener;
import kafka.log.LogSegments;
import kafka.log.MergedLog;
import kafka.log.MergedLog$;
import kafka.log.NoOpLogOffsetsListener$;
import kafka.log.ProducerStateManager;
import kafka.log.TierLogComponents$;
import kafka.server.BrokerTopicStats;
import kafka.server.LogDirFailureChannel;
import kafka.tier.state.FileTierPartitionState;
import kafka.tier.state.TierPartitionState;
import kafka.utils.KafkaScheduler;
import kafka.utils.Logging;
import kafka.utils.MockTime;
import kafka.utils.Scheduler;
import kafka.utils.TestUtils$;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.test.TestUtils;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;
import scala.Function0;
import scala.Function1;
import scala.None$;
import scala.Option;
import scala.collection.immutable.List;
import scala.collection.immutable.Seq;
import scala.package$;
import scala.reflect.ScalaSignature;
import scala.runtime.BoxedUnit;
import scala.runtime.ScalaRunTime$;
import scala.runtime.java8.JFunction0;

@ScalaSignature(bytes="\u0006\u0005I4A\u0001F\u000b\u00015!)\u0011\u0005\u0001C\u0001E!9Q\u0005\u0001b\u0001\n\u00031\u0003B\u0002\u0016\u0001A\u0003%q\u0005C\u0004,\u0001\t\u0007I\u0011\u0001\u0017\t\rA\u0002\u0001\u0015!\u0003.\u0011\u001d\t\u0004A1A\u0005\u0002IBaa\u0010\u0001!\u0002\u0013\u0019\u0004b\u0002!\u0001\u0005\u0004%\tA\r\u0005\u0007\u0003\u0002\u0001\u000b\u0011B\u001a\t\u000b\t\u0003A\u0011A\"\t\u000bQ\u0003A\u0011A\"\t\u000be\u0003A\u0011A\"\t\u000by\u0003A\u0011A\"\t\u000b\u0001\u0004A\u0011A\"\t\u000b\t\u0004A\u0011A\"\t\u000b\u0011\u0004A\u0011A\"\t\u000b\u0019\u0004A\u0011A\"\t\u000b!\u0004A\u0011A\"\t\u000b)\u0004A\u0011A\"\u0003\u001bM\u001b\u0007.\u001a3vY\u0016\u0014H+Z:u\u0015\t1r#A\u0003vi&d7OC\u0001\u0019\u0003\u0015Y\u0017MZ6b\u0007\u0001\u0019\"\u0001A\u000e\u0011\u0005qyR\"A\u000f\u000b\u0003y\tQa]2bY\u0006L!\u0001I\u000f\u0003\r\u0005s\u0017PU3g\u0003\u0019a\u0014N\\5u}Q\t1\u0005\u0005\u0002%\u00015\tQ#A\u0005tG\",G-\u001e7feV\tq\u0005\u0005\u0002%Q%\u0011\u0011&\u0006\u0002\u000f\u0017\u000647.Y*dQ\u0016$W\u000f\\3s\u0003)\u00198\r[3ek2,'\u000fI\u0001\t[>\u001c7\u000eV5nKV\tQ\u0006\u0005\u0002%]%\u0011q&\u0006\u0002\t\u001b>\u001c7\u000eV5nK\u0006IQn\\2l)&lW\rI\u0001\tG>,h\u000e^3scU\t1\u0007\u0005\u00025{5\tQG\u0003\u00027o\u00051\u0011\r^8nS\u000eT!\u0001O\u001d\u0002\u0015\r|gnY;se\u0016tGO\u0003\u0002;w\u0005!Q\u000f^5m\u0015\u0005a\u0014\u0001\u00026bm\u0006L!AP\u001b\u0003\u001b\u0005#x.\\5d\u0013:$XmZ3s\u0003%\u0019w.\u001e8uKJ\f\u0004%\u0001\u0005d_VtG/\u001a:3\u0003%\u0019w.\u001e8uKJ\u0014\u0004%A\u0003tKR,\b\u000fF\u0001E!\taR)\u0003\u0002G;\t!QK\\5uQ\tQ\u0001\n\u0005\u0002J%6\t!J\u0003\u0002L\u0019\u0006\u0019\u0011\r]5\u000b\u00055s\u0015a\u00026va&$XM\u001d\u0006\u0003\u001fB\u000bQA[;oSRT\u0011!U\u0001\u0004_J<\u0017BA*K\u0005)\u0011UMZ8sK\u0016\u000b7\r[\u0001\ti\u0016\f'\u000fZ8x]\"\u00121B\u0016\t\u0003\u0013^K!\u0001\u0017&\u0003\u0013\u00053G/\u001a:FC\u000eD\u0017\u0001\t;fgRlunY6TG\",G-\u001e7fe:{g\u000eU3sS>$\u0017n\u0019+bg.D#\u0001D.\u0011\u0005%c\u0016BA/K\u0005\u0011!Vm\u001d;\u0002;Q,7\u000f^'pG.\u001c6\r[3ek2,'\u000fU3sS>$\u0017n\u0019+bg.D#!D.\u0002AQ,7\u000f\u001e*fK:$(/\u00198u)\u0006\u001c8.\u00138N_\u000e\\7k\u00195fIVdWM\u001d\u0015\u0003\u001dm\u000b1\u0003^3ti:{g\u000eU3sS>$\u0017n\u0019+bg.D#aD.\u0002!Q,7\u000f\u001e)fe&|G-[2UCN\\\u0007F\u0001\t\\\u0003-!Xm\u001d;SKN$\u0018M\u001d;)\u0005EY\u0016A\u0007;fgR,fn]2iK\u0012,H.\u001a)s_\u0012,8-\u001a:UCN\\\u0007F\u0001\n\\\u0003a!Xm\u001d;N_\u000e\\7k\u00195fIVdWM\u001d'pG.Lgn\u001a\u0015\u0003'mCCaE7qcB\u0011\u0011J\\\u0005\u0003_*\u0013q\u0001V5nK>,H/A\u0003wC2,XMH\u0001\u0010\u0001")
public class SchedulerTest {
    private final KafkaScheduler scheduler = new KafkaScheduler(1, "kafka-scheduler-", true, false);
    private final MockTime mockTime = new MockTime();
    private final AtomicInteger counter1 = new AtomicInteger(0);
    private final AtomicInteger counter2 = new AtomicInteger(0);

    public KafkaScheduler scheduler() {
        return this.scheduler;
    }

    public MockTime mockTime() {
        return this.mockTime;
    }

    public AtomicInteger counter1() {
        return this.counter1;
    }

    public AtomicInteger counter2() {
        return this.counter2;
    }

    @BeforeEach
    public void setup() {
        this.scheduler().startup();
    }

    @AfterEach
    public void teardown() {
        this.scheduler().shutdown();
    }

    @Test
    public void testMockSchedulerNonPeriodicTask() {
        this.mockTime().scheduler().schedule("test1", (Function0<BoxedUnit>)(JFunction0.mcV.sp & Serializable)() -> this.counter1().getAndIncrement(), 1L, this.mockTime().scheduler().schedule$default$4(), this.mockTime().scheduler().schedule$default$5());
        this.mockTime().scheduler().schedule("test2", (Function0<BoxedUnit>)(JFunction0.mcV.sp & Serializable)() -> this.counter2().getAndIncrement(), 100L, this.mockTime().scheduler().schedule$default$4(), this.mockTime().scheduler().schedule$default$5());
        Assertions.assertEquals((int)0, (int)this.counter1().get(), (String)"Counter1 should not be incremented prior to task running.");
        Assertions.assertEquals((int)0, (int)this.counter2().get(), (String)"Counter2 should not be incremented prior to task running.");
        this.mockTime().sleep(1L);
        Assertions.assertEquals((int)1, (int)this.counter1().get(), (String)"Counter1 should be incremented");
        Assertions.assertEquals((int)0, (int)this.counter2().get(), (String)"Counter2 should not be incremented");
        this.mockTime().sleep(100000L);
        Assertions.assertEquals((int)1, (int)this.counter1().get(), (String)"More sleeping should not result in more incrementing on counter1.");
        Assertions.assertEquals((int)1, (int)this.counter2().get(), (String)"Counter2 should now be incremented.");
    }

    @Test
    public void testMockSchedulerPeriodicTask() {
        this.mockTime().scheduler().schedule("test1", (Function0<BoxedUnit>)(JFunction0.mcV.sp & Serializable)() -> this.counter1().getAndIncrement(), 1L, 1L, this.mockTime().scheduler().schedule$default$5());
        this.mockTime().scheduler().schedule("test2", (Function0<BoxedUnit>)(JFunction0.mcV.sp & Serializable)() -> this.counter2().getAndIncrement(), 100L, 100L, this.mockTime().scheduler().schedule$default$5());
        Assertions.assertEquals((int)0, (int)this.counter1().get(), (String)"Counter1 should not be incremented prior to task running.");
        Assertions.assertEquals((int)0, (int)this.counter2().get(), (String)"Counter2 should not be incremented prior to task running.");
        this.mockTime().sleep(1L);
        Assertions.assertEquals((int)1, (int)this.counter1().get(), (String)"Counter1 should be incremented");
        Assertions.assertEquals((int)0, (int)this.counter2().get(), (String)"Counter2 should not be incremented");
        this.mockTime().sleep(100L);
        Assertions.assertEquals((int)101, (int)this.counter1().get(), (String)"Counter1 should be incremented 101 times");
        Assertions.assertEquals((int)1, (int)this.counter2().get(), (String)"Counter2 should not be incremented once");
    }

    @Test
    public void testReentrantTaskInMockScheduler() {
        this.mockTime().scheduler().schedule("test1", (Function0<BoxedUnit>)(JFunction0.mcV.sp & Serializable)() -> this.mockTime().scheduler().schedule("test2", (Function0<BoxedUnit>)(JFunction0.mcV.sp & Serializable)() -> this.counter2().getAndIncrement(), 0L, this.mockTime().scheduler().schedule$default$4(), this.mockTime().scheduler().schedule$default$5()), 1L, this.mockTime().scheduler().schedule$default$4(), this.mockTime().scheduler().schedule$default$5());
        this.mockTime().sleep(1L);
        Assertions.assertEquals((int)1, (int)this.counter2().get());
    }

    @Test
    public void testNonPeriodicTask() {
        this.scheduler().schedule("test", (Function0)(JFunction0.mcV.sp & Serializable)() -> this.counter1().getAndIncrement(), 0L, this.scheduler().schedule$default$4(), this.scheduler().schedule$default$5());
        long retry_maxWaitMs = 30000L;
        TestUtils$ retry_this = TestUtils$.MODULE$;
        long l = 1L;
        long retry_startTime = System.currentTimeMillis();
        while (true) {
            try {
                SchedulerTest.$anonfun$testNonPeriodicTask$2(this);
            }
            catch (AssertionError retry_e) {
                if (System.currentTimeMillis() - retry_startTime > retry_maxWaitMs) {
                    throw retry_e;
                }
                if (retry_this.logger().underlying().isInfoEnabled()) {
                    String string;
                    String string2 = string = new StringBuilder(49).append("Attempt failed, sleeping for ").append(l).append(", and then retrying.").toString();
                    string = null;
                    String msgWithLogIdent_msg = string2;
                    Object var7_7 = null;
                    retry_this.logger().underlying().info(Logging.msgWithLogIdent$((Logging)retry_this, (String)msgWithLogIdent_msg));
                }
                Thread.sleep(l);
                l += scala.math.package$.MODULE$.min(l, 1000L);
                continue;
            }
            break;
        }
        Object var1_2 = null;
        Object var6_6 = null;
        Thread.sleep(5L);
        Assertions.assertEquals((int)1, (int)this.counter1().get(), (String)"Should only run once");
    }

    @Test
    public void testPeriodicTask() {
        this.scheduler().schedule("test", (Function0)(JFunction0.mcV.sp & Serializable)() -> this.counter1().getAndIncrement(), 0L, 5L, this.scheduler().schedule$default$5());
        long retry_maxWaitMs = 30000L;
        TestUtils$ retry_this = TestUtils$.MODULE$;
        long l = 1L;
        long retry_startTime = System.currentTimeMillis();
        while (true) {
            try {
                SchedulerTest.$anonfun$testPeriodicTask$2(this);
            }
            catch (AssertionError retry_e) {
                if (System.currentTimeMillis() - retry_startTime > retry_maxWaitMs) {
                    throw retry_e;
                }
                if (retry_this.logger().underlying().isInfoEnabled()) {
                    String string;
                    String string2 = string = new StringBuilder(49).append("Attempt failed, sleeping for ").append(l).append(", and then retrying.").toString();
                    string = null;
                    String msgWithLogIdent_msg = string2;
                    Object var7_6 = null;
                    retry_this.logger().underlying().info(Logging.msgWithLogIdent$((Logging)retry_this, (String)msgWithLogIdent_msg));
                }
                Thread.sleep(l);
                l += scala.math.package$.MODULE$.min(l, 1000L);
                continue;
            }
            break;
        }
    }

    @Test
    public void testRestart() {
        this.mockTime().scheduler().schedule("test1", (Function0<BoxedUnit>)(JFunction0.mcV.sp & Serializable)() -> this.counter1().getAndIncrement(), 1L, this.mockTime().scheduler().schedule$default$4(), this.mockTime().scheduler().schedule$default$5());
        this.mockTime().sleep(1L);
        Assertions.assertEquals((int)1, (int)this.counter1().get());
        this.mockTime().scheduler().shutdown();
        this.mockTime().scheduler().startup();
        this.mockTime().scheduler().schedule("test1", (Function0<BoxedUnit>)(JFunction0.mcV.sp & Serializable)() -> this.counter1().getAndIncrement(), 1L, this.mockTime().scheduler().schedule$default$4(), this.mockTime().scheduler().schedule$default$5());
        this.mockTime().sleep(1L);
        Assertions.assertEquals((int)2, (int)this.counter1().get());
    }

    @Test
    public void testUnscheduleProducerTask() {
        TopicPartition topicPartition;
        File file;
        File file2;
        File file3;
        File file4 = file3 = TestUtils.tempDirectory(null, null);
        file3 = null;
        File file5 = file2 = file4;
        file2 = null;
        File file6 = file = file5;
        file = null;
        File tmpDir = file6;
        File logDir = TestUtils$.MODULE$.randomPartitionLogDir(tmpDir);
        LogConfig logConfig = new LogConfig((Map)new Properties(), LogConfig$.MODULE$.apply$default$2());
        BrokerTopicStats brokerTopicStats = new BrokerTopicStats();
        Metrics metrics = new Metrics();
        int maxTransactionTimeoutMs = 300000;
        int maxProducerIdExpirationMs = 3600000;
        TopicPartition topicPartition2 = topicPartition = LocalLog$.MODULE$.parseTopicPartitionName(logDir);
        topicPartition = null;
        TopicPartition topicPartition3 = topicPartition2;
        LogDirFailureChannel logDirFailureChannel = new LogDirFailureChannel(10);
        LogSegments segments = new LogSegments(topicPartition3);
        Option leaderEpochCache = MergedLog$.MODULE$.maybeCreateLeaderEpochCache(logDir, topicPartition3, logDirFailureChannel, logConfig.recordVersion(), "");
        ProducerStateManager producerStateManager = new ProducerStateManager(topicPartition3, logDir, maxTransactionTimeoutMs, maxProducerIdExpirationMs, (Time)this.mockTime());
        LoadedLogOffsets offsets = new LogLoader(logDir, topicPartition3, logConfig, (Scheduler)this.scheduler(), (Time)this.mockTime(), logDirFailureChannel, true, segments, 0L, 0L, leaderEpochCache, producerStateManager).load();
        LocalLog localLog = new LocalLog(logDir, logConfig, segments, offsets.recoveryPoint(), offsets.nextOffsetMetadata(), (Scheduler)this.scheduler(), (Time)this.mockTime(), topicPartition3, logDirFailureChannel, brokerTopicStats, (LogOffsetsListener)NoOpLogOffsetsListener$.MODULE$);
        FileTierPartitionState tierPartitionState = new FileTierPartitionState(logDir, logDirFailureChannel, topicPartition3, false, (Scheduler)this.mockTime().scheduler(), false, true, false, (Time)this.mockTime());
        MergedLog log = new MergedLog(localLog, 0L, true, metrics, leaderEpochCache, LogManager$.MODULE$.ProducerIdExpirationCheckIntervalMs(), producerStateManager, (Option)None$.MODULE$, true, (TierPartitionState)tierPartitionState, TierLogComponents$.MODULE$.EMPTY(), (Option)None$.MODULE$);
        Assertions.assertTrue((boolean)this.scheduler().taskRunning(log.producerExpireCheck()));
        log.close();
        Assertions.assertFalse((boolean)this.scheduler().taskRunning(log.producerExpireCheck()));
    }

    @Timeout(value=15L)
    @Test
    public void testMockSchedulerLocking() {
        CountDownLatch initLatch = new CountDownLatch(1);
        CountDownLatch completionLatch = new CountDownLatch(2);
        List taskLatches = (List)package$.MODULE$.List().apply((Seq)ScalaRunTime$.MODULE$.wrapRefArray((Object[])new CountDownLatch[]{new CountDownLatch(1), new CountDownLatch(1)}));
        this.mockTime().scheduler().schedule("test1", (Function0<BoxedUnit>)(JFunction0.mcV.sp & Serializable)() -> SchedulerTest.scheduledTask$1((CountDownLatch)taskLatches.head(), initLatch, completionLatch), 1L, this.mockTime().scheduler().schedule$default$4(), this.mockTime().scheduler().schedule$default$5());
        ScheduledExecutorService tickExecutor = Executors.newSingleThreadScheduledExecutor();
        try {
            tickExecutor.scheduleWithFixedDelay(() -> this.mockTime().sleep(1L), 0L, 1L, TimeUnit.MILLISECONDS);
            Assertions.assertTrue((boolean)initLatch.await(10L, TimeUnit.SECONDS));
            this.mockTime().scheduler().schedule("test2", (Function0<BoxedUnit>)(JFunction0.mcV.sp & Serializable)() -> SchedulerTest.scheduledTask$1((CountDownLatch)taskLatches.apply(1), initLatch, completionLatch), 1L, this.mockTime().scheduler().schedule$default$4(), this.mockTime().scheduler().schedule$default$5());
            taskLatches.foreach((Function1 & Serializable)x$1 -> {
                x$1.countDown();
                return BoxedUnit.UNIT;
            });
            Assertions.assertTrue((boolean)completionLatch.await(10L, TimeUnit.SECONDS), (String)"Tasks did not complete");
        }
        finally {
            tickExecutor.shutdownNow();
        }
    }

    public static final /* synthetic */ void $anonfun$testNonPeriodicTask$2(SchedulerTest $this) {
        Assertions.assertEquals((int)$this.counter1().get(), (int)1);
    }

    public static final /* synthetic */ void $anonfun$testPeriodicTask$2(SchedulerTest $this) {
        Assertions.assertTrue(($this.counter1().get() >= 20 ? 1 : 0) != 0, (String)"Should count to 20");
    }

    private static final void scheduledTask$1(CountDownLatch taskLatch, CountDownLatch initLatch$1, CountDownLatch completionLatch$1) {
        initLatch$1.countDown();
        Assertions.assertTrue((boolean)taskLatch.await(30L, TimeUnit.SECONDS), (String)"Timed out waiting for latch");
        completionLatch$1.countDown();
    }
}

