/*
 * Decompiled with CFR 0.152.
 */
package kafka.utils;

import io.confluent.kafka.storage.checksum.ChecksumParams;
import java.io.File;
import java.io.Serializable;
import java.util.Map;
import java.util.Optional;
import java.util.Properties;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import kafka.log.LocalLog;
import kafka.log.LocalLog$;
import kafka.log.LogLoader;
import kafka.log.MergedLog;
import kafka.log.MergedLog$;
import kafka.log.TierLogComponents;
import kafka.log.TierLogComponents$;
import kafka.server.BrokerTopicStats;
import kafka.server.Defaults$;
import kafka.tier.state.FileTierPartitionState;
import kafka.tier.state.TierPartitionState;
import kafka.tier.state.TierPartitionStateCleanupConfig;
import kafka.utils.Logging;
import kafka.utils.TestUtils$;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.server.util.KafkaScheduler;
import org.apache.kafka.server.util.MockTime;
import org.apache.kafka.server.util.Scheduler;
import org.apache.kafka.storage.internals.log.LoadedLogOffsets;
import org.apache.kafka.storage.internals.log.LogConfig;
import org.apache.kafka.storage.internals.log.LogDirFailureChannel;
import org.apache.kafka.storage.internals.log.LogOffsetsListener;
import org.apache.kafka.storage.internals.log.LogSegments;
import org.apache.kafka.storage.internals.log.ProducerStateManager;
import org.apache.kafka.storage.internals.log.ProducerStateManagerConfig;
import org.apache.kafka.test.TestUtils;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;
import scala.Function1;
import scala.None$;
import scala.Option;
import scala.collection.immutable.;
import scala.collection.immutable.List;
import scala.collection.immutable.Nil$;
import scala.math.package$;
import scala.reflect.ScalaSignature;
import scala.runtime.BoxedUnit;

@ScalaSignature(bytes="\u0006\u0005\u0005ua\u0001B\f\u0019\u0001uAQ\u0001\n\u0001\u0005\u0002\u0015Bq\u0001\u000b\u0001C\u0002\u0013\u0005\u0011\u0006\u0003\u00048\u0001\u0001\u0006IA\u000b\u0005\bq\u0001\u0011\r\u0011\"\u0001:\u0011\u0019i\u0004\u0001)A\u0005u!9a\b\u0001b\u0001\n\u0003y\u0004BB&\u0001A\u0003%\u0001\tC\u0004M\u0001\t\u0007I\u0011A \t\r5\u0003\u0001\u0015!\u0003A\u0011\u001dq\u0005A1A\u0005\u0002=Ca!\u0018\u0001!\u0002\u0013\u0001\u0006\"\u00020\u0001\t\u0003y\u0006\"\u00028\u0001\t\u0003y\u0006\"B:\u0001\t\u0003y\u0006\"\u0002=\u0001\t\u0003y\u0006\"\u0002>\u0001\t\u0003y\u0006\"\u0002?\u0001\t\u0003y\u0006\"\u0002@\u0001\t\u0003y\u0006BBA\u0001\u0001\u0011\u0005q\f\u0003\u0004\u0002\u0006\u0001!\ta\u0018\u0005\u0007\u0003\u0013\u0001A\u0011A0\t\r\u00055\u0001\u0001\"\u0001`\u00055\u00196\r[3ek2,'\u000fV3ti*\u0011\u0011DG\u0001\u0006kRLGn\u001d\u0006\u00027\u0005)1.\u00194lC\u000e\u00011C\u0001\u0001\u001f!\ty\"%D\u0001!\u0015\u0005\t\u0013!B:dC2\f\u0017BA\u0012!\u0005\u0019\te.\u001f*fM\u00061A(\u001b8jiz\"\u0012A\n\t\u0003O\u0001i\u0011\u0001G\u0001\ng\u000eDW\rZ;mKJ,\u0012A\u000b\t\u0003WUj\u0011\u0001\f\u0006\u0003[9\nA!\u001e;jY*\u0011q\u0006M\u0001\u0007g\u0016\u0014h/\u001a:\u000b\u0005m\t$B\u0001\u001a4\u0003\u0019\t\u0007/Y2iK*\tA'A\u0002pe\u001eL!A\u000e\u0017\u0003\u001d-\u000bgm[1TG\",G-\u001e7fe\u0006Q1o\u00195fIVdWM\u001d\u0011\u0002\u00115|7m\u001b+j[\u0016,\u0012A\u000f\t\u0003WmJ!\u0001\u0010\u0017\u0003\u00115{7m\u001b+j[\u0016\f\u0011\"\\8dWRKW.\u001a\u0011\u0002\u0011\r|WO\u001c;feF*\u0012\u0001\u0011\t\u0003\u0003&k\u0011A\u0011\u0006\u0003\u0007\u0012\u000ba!\u0019;p[&\u001c'BA#G\u0003)\u0019wN\\2veJ,g\u000e\u001e\u0006\u0003[\u001dS\u0011\u0001S\u0001\u0005U\u00064\u0018-\u0003\u0002K\u0005\ni\u0011\t^8nS\u000eLe\u000e^3hKJ\f\u0011bY8v]R,'/\r\u0011\u0002\u0011\r|WO\u001c;feJ\n\u0011bY8v]R,'O\r\u0011\u0002\u001d\rDWmY6tk6\u0004\u0016M]1ngV\t\u0001\u000b\u0005\u0002R76\t!K\u0003\u0002T)\u0006A1\r[3dWN,XN\u0003\u0002V-\u000691\u000f^8sC\u001e,'BA\u000eX\u0015\tA\u0016,A\u0005d_:4G.^3oi*\t!,\u0001\u0002j_&\u0011AL\u0015\u0002\u000f\u0007\",7m[:v[B\u000b'/Y7t\u0003=\u0019\u0007.Z2lgVl\u0007+\u0019:b[N\u0004\u0013!B:fiV\u0004H#\u00011\u0011\u0005}\t\u0017B\u00012!\u0005\u0011)f.\u001b;)\u00051!\u0007CA3m\u001b\u00051'BA4i\u0003\r\t\u0007/\u001b\u0006\u0003S*\fqA[;qSR,'O\u0003\u0002lg\u0005)!.\u001e8ji&\u0011QN\u001a\u0002\u000b\u0005\u00164wN]3FC\u000eD\u0017\u0001\u0003;fCJ$wn\u001e8)\u00055\u0001\bCA3r\u0013\t\u0011hMA\u0005BMR,'/R1dQ\u0006\u0001C/Z:u\u001b>\u001c7nU2iK\u0012,H.\u001a:O_:\u0004VM]5pI&\u001cG+Y:lQ\tqQ\u000f\u0005\u0002fm&\u0011qO\u001a\u0002\u0005)\u0016\u001cH/A\u000fuKN$Xj\\2l'\u000eDW\rZ;mKJ\u0004VM]5pI&\u001cG+Y:lQ\tyQ/\u0001\u0011uKN$(+Z3oiJ\fg\u000e\u001e+bg.Le.T8dWN\u001b\u0007.\u001a3vY\u0016\u0014\bF\u0001\tv\u0003M!Xm\u001d;O_:\u0004VM]5pI&\u001cG+Y:lQ\t\tR/A\u0012uKN$hj\u001c8QKJLw\u000eZ5d)\u0006\u001c8n\u00165f]B+'/[8e\u0013NTVM]8)\u0005I)\u0018\u0001\u0005;fgR\u0004VM]5pI&\u001cG+Y:lQ\t\u0019R/A\u0006uKN$(+Z:uCJ$\bF\u0001\u000bv\u0003i!Xm\u001d;V]N\u001c\u0007.\u001a3vY\u0016\u0004&o\u001c3vG\u0016\u0014H+Y:lQ\t)R/\u0001\ruKN$Xj\\2l'\u000eDW\rZ;mKJdunY6j]\u001eDsAFA\t\u0003/\tI\u0002E\u0002f\u0003'I1!!\u0006g\u0005\u001d!\u0016.\\3pkR\fQA^1mk\u0016t\u0012a\u0004\u0015\u0003-U\u0004")
public class SchedulerTest {
    private final KafkaScheduler scheduler = new KafkaScheduler(1);
    private final MockTime mockTime = new MockTime();
    private final AtomicInteger counter1 = new AtomicInteger(0);
    private final AtomicInteger counter2 = new AtomicInteger(0);
    private final ChecksumParams checksumParams = TestUtils$.MODULE$.createChecksumParams();

    public KafkaScheduler scheduler() {
        return this.scheduler;
    }

    public MockTime mockTime() {
        return this.mockTime;
    }

    public AtomicInteger counter1() {
        return this.counter1;
    }

    public AtomicInteger counter2() {
        return this.counter2;
    }

    public ChecksumParams checksumParams() {
        return this.checksumParams;
    }

    @BeforeEach
    public void setup() {
        this.scheduler().startup();
    }

    @AfterEach
    public void teardown() {
        this.scheduler().shutdown();
    }

    @Test
    public void testMockSchedulerNonPeriodicTask() {
        this.mockTime().scheduler.scheduleOnce("test1", () -> this.counter1().getAndIncrement(), 1L);
        this.mockTime().scheduler.scheduleOnce("test2", () -> this.counter2().getAndIncrement(), 100L);
        Assertions.assertEquals((int)0, (int)this.counter1().get(), (String)"Counter1 should not be incremented prior to task running.");
        Assertions.assertEquals((int)0, (int)this.counter2().get(), (String)"Counter2 should not be incremented prior to task running.");
        this.mockTime().sleep(1L);
        Assertions.assertEquals((int)1, (int)this.counter1().get(), (String)"Counter1 should be incremented");
        Assertions.assertEquals((int)0, (int)this.counter2().get(), (String)"Counter2 should not be incremented");
        this.mockTime().sleep(100000L);
        Assertions.assertEquals((int)1, (int)this.counter1().get(), (String)"More sleeping should not result in more incrementing on counter1.");
        Assertions.assertEquals((int)1, (int)this.counter2().get(), (String)"Counter2 should now be incremented.");
    }

    @Test
    public void testMockSchedulerPeriodicTask() {
        this.mockTime().scheduler.schedule("test1", () -> this.counter1().getAndIncrement(), 1L, 1L);
        this.mockTime().scheduler.schedule("test2", () -> this.counter2().getAndIncrement(), 100L, 100L);
        Assertions.assertEquals((int)0, (int)this.counter1().get(), (String)"Counter1 should not be incremented prior to task running.");
        Assertions.assertEquals((int)0, (int)this.counter2().get(), (String)"Counter2 should not be incremented prior to task running.");
        this.mockTime().sleep(1L);
        Assertions.assertEquals((int)1, (int)this.counter1().get(), (String)"Counter1 should be incremented");
        Assertions.assertEquals((int)0, (int)this.counter2().get(), (String)"Counter2 should not be incremented");
        this.mockTime().sleep(100L);
        Assertions.assertEquals((int)101, (int)this.counter1().get(), (String)"Counter1 should be incremented 101 times");
        Assertions.assertEquals((int)1, (int)this.counter2().get(), (String)"Counter2 should not be incremented once");
    }

    @Test
    public void testReentrantTaskInMockScheduler() {
        this.mockTime().scheduler.scheduleOnce("test1", () -> $this.mockTime().scheduler.scheduleOnce("test2", () -> this.counter2().getAndIncrement(), 0L), 1L);
        this.mockTime().sleep(1L);
        Assertions.assertEquals((int)1, (int)this.counter2().get());
    }

    @Test
    public void testNonPeriodicTask() {
        this.scheduler().scheduleOnce("test", () -> this.counter1().getAndIncrement());
        long retry_maxWaitMs = 30000L;
        TestUtils$ retry_this = TestUtils$.MODULE$;
        long l = 1L;
        long retry_startTime = System.currentTimeMillis();
        while (true) {
            try {
                SchedulerTest.$anonfun$testNonPeriodicTask$2(this);
            }
            catch (AssertionError retry_e) {
                if (System.currentTimeMillis() - retry_startTime > retry_maxWaitMs) {
                    throw retry_e;
                }
                if (retry_this.logger().underlying().isInfoEnabled()) {
                    String msgWithLogIdent_msg = new StringBuilder(49).append("Attempt failed, sleeping for ").append(l).append(", and then retrying.").toString();
                    Object var7_7 = null;
                    retry_this.logger().underlying().info(Logging.msgWithLogIdent$((Logging)retry_this, (String)msgWithLogIdent_msg));
                }
                Thread.sleep(l);
                l += package$.MODULE$.min(l, 1000L);
                continue;
            }
            break;
        }
        Object var1_2 = null;
        Object var6_6 = null;
        Thread.sleep(5L);
        Assertions.assertEquals((int)1, (int)this.counter1().get(), (String)"Should only run once");
    }

    @Test
    public void testNonPeriodicTaskWhenPeriodIsZero() {
        this.scheduler().schedule("test", () -> this.counter1().getAndIncrement(), 0L, 0L);
        long retry_maxWaitMs = 30000L;
        TestUtils$ retry_this = TestUtils$.MODULE$;
        long l = 1L;
        long retry_startTime = System.currentTimeMillis();
        while (true) {
            try {
                SchedulerTest.$anonfun$testNonPeriodicTaskWhenPeriodIsZero$2(this);
            }
            catch (AssertionError retry_e) {
                if (System.currentTimeMillis() - retry_startTime > retry_maxWaitMs) {
                    throw retry_e;
                }
                if (retry_this.logger().underlying().isInfoEnabled()) {
                    String msgWithLogIdent_msg = new StringBuilder(49).append("Attempt failed, sleeping for ").append(l).append(", and then retrying.").toString();
                    Object var7_7 = null;
                    retry_this.logger().underlying().info(Logging.msgWithLogIdent$((Logging)retry_this, (String)msgWithLogIdent_msg));
                }
                Thread.sleep(l);
                l += package$.MODULE$.min(l, 1000L);
                continue;
            }
            break;
        }
        Object var1_2 = null;
        Object var6_6 = null;
        Thread.sleep(5L);
        Assertions.assertEquals((int)1, (int)this.counter1().get(), (String)"Should only run once");
    }

    @Test
    public void testPeriodicTask() {
        this.scheduler().schedule("test", () -> this.counter1().getAndIncrement(), 0L, 5L);
        long retry_maxWaitMs = 30000L;
        TestUtils$ retry_this = TestUtils$.MODULE$;
        long l = 1L;
        long retry_startTime = System.currentTimeMillis();
        while (true) {
            try {
                SchedulerTest.$anonfun$testPeriodicTask$2(this);
            }
            catch (AssertionError retry_e) {
                if (System.currentTimeMillis() - retry_startTime > retry_maxWaitMs) {
                    throw retry_e;
                }
                if (retry_this.logger().underlying().isInfoEnabled()) {
                    String msgWithLogIdent_msg = new StringBuilder(49).append("Attempt failed, sleeping for ").append(l).append(", and then retrying.").toString();
                    Object var7_6 = null;
                    retry_this.logger().underlying().info(Logging.msgWithLogIdent$((Logging)retry_this, (String)msgWithLogIdent_msg));
                }
                Thread.sleep(l);
                l += package$.MODULE$.min(l, 1000L);
                continue;
            }
            break;
        }
    }

    @Test
    public void testRestart() {
        this.mockTime().scheduler.scheduleOnce("test1", () -> this.counter1().getAndIncrement(), 1L);
        this.mockTime().sleep(1L);
        Assertions.assertEquals((int)1, (int)this.counter1().get());
        this.mockTime().scheduler.shutdown();
        this.mockTime().scheduler.startup();
        this.mockTime().scheduler.scheduleOnce("test1", () -> this.counter1().getAndIncrement(), 1L);
        this.mockTime().sleep(1L);
        Assertions.assertEquals((int)2, (int)this.counter1().get());
    }

    @Test
    public void testUnscheduleProducerTask() {
        File tmpDir = TestUtils.tempDirectory(null, null);
        TestUtils$ randomPartitionLogDir_this = TestUtils$.MODULE$;
        Object var27_2 = null;
        File logDir = randomPartitionLogDir_this.randomPartitionForTopicLogDir(tmpDir, "kafka");
        LogConfig logConfig = new LogConfig((Map)new Properties());
        BrokerTopicStats brokerTopicStats = new BrokerTopicStats();
        Metrics metrics = new Metrics();
        int maxTransactionTimeoutMs = 300000;
        int maxProducerIdExpirationMs = Defaults$.MODULE$.ProducerIdExpirationMs();
        int producerIdExpirationCheckIntervalMs = Defaults$.MODULE$.ProducerIdExpirationCheckIntervalMs();
        TopicPartition topicPartition = LocalLog$.MODULE$.parseTopicPartitionName(logDir);
        LogDirFailureChannel logDirFailureChannel = new LogDirFailureChannel(10);
        LogSegments segments = new LogSegments(topicPartition);
        Optional leaderEpochCache = MergedLog$.MODULE$.maybeCreateLeaderEpochCache(logDir, topicPartition, logDirFailureChannel, logConfig.recordVersion(), "");
        ProducerStateManager producerStateManager = new ProducerStateManager(topicPartition, logDir, maxTransactionTimeoutMs, new ProducerStateManagerConfig(maxProducerIdExpirationMs, false), (Time)this.mockTime(), Optional.empty(), this.checksumParams());
        KafkaScheduler x$4 = this.scheduler();
        MockTime x$5 = this.mockTime();
        ChecksumParams x$13 = this.checksumParams();
        ConcurrentHashMap x$14 = new ConcurrentHashMap();
        boolean x$15 = false;
        LoadedLogOffsets offsets = new LogLoader(logDir, topicPartition, logConfig, (Scheduler)x$4, (Time)x$5, logDirFailureChannel, true, segments, 0L, 0L, leaderEpochCache, producerStateManager, x$14, x$13, x$15).load();
        LocalLog localLog = new LocalLog(logDir, logConfig, segments, offsets.recoveryPoint, offsets.nextOffsetMetadata, (Scheduler)this.scheduler(), (Time)this.mockTime(), topicPartition, logDirFailureChannel, brokerTopicStats, LogOffsetsListener.NO_OP_OFFSETS_LISTENER, this.checksumParams());
        FileTierPartitionState tierPartitionState = new FileTierPartitionState(logDir, logDirFailureChannel, topicPartition, false, (Scheduler)this.mockTime().scheduler, false, true, (Time)this.mockTime(), TierPartitionStateCleanupConfig.EMPTY, false, -1);
        None$ x$23 = None$.MODULE$;
        TierLogComponents x$26 = TierLogComponents$.MODULE$.EMPTY();
        ChecksumParams x$27 = this.checksumParams();
        None$ x$28 = None$.MODULE$;
        MergedLog log = new MergedLog(localLog, 0L, true, metrics, leaderEpochCache, producerIdExpirationCheckIntervalMs, producerStateManager, (Option)x$23, true, (TierPartitionState)tierPartitionState, x$26, (Option)x$28, x$27);
        Assertions.assertTrue((boolean)this.scheduler().taskRunning(log.producerExpireCheck()));
        log.close();
        Assertions.assertFalse((boolean)this.scheduler().taskRunning(log.producerExpireCheck()));
    }

    @Timeout(value=15L)
    @Test
    public void testMockSchedulerLocking() {
        CountDownLatch initLatch = new CountDownLatch(1);
        CountDownLatch completionLatch = new CountDownLatch(2);
        .colon.colon taskLatches = new .colon.colon((Object)new CountDownLatch(1), (List)new .colon.colon((Object)new CountDownLatch(1), (List)Nil$.MODULE$));
        this.mockTime().scheduler.scheduleOnce("test1", () -> SchedulerTest.$anonfun$testMockSchedulerLocking$1((List)taskLatches, initLatch, completionLatch), 1L);
        ScheduledExecutorService tickExecutor = Executors.newSingleThreadScheduledExecutor();
        try {
            tickExecutor.scheduleWithFixedDelay(() -> this.mockTime().sleep(1L), 0L, 1L, TimeUnit.MILLISECONDS);
            Assertions.assertTrue((boolean)initLatch.await(10L, TimeUnit.SECONDS));
            this.mockTime().scheduler.scheduleOnce("test2", () -> SchedulerTest.$anonfun$testMockSchedulerLocking$3((List)taskLatches, initLatch, completionLatch), 1L);
            taskLatches.foreach((Function1 & Serializable)x$1 -> {
                x$1.countDown();
                return BoxedUnit.UNIT;
            });
            Assertions.assertTrue((boolean)completionLatch.await(10L, TimeUnit.SECONDS), (String)"Tasks did not complete");
        }
        finally {
            tickExecutor.shutdownNow();
        }
    }

    public static final /* synthetic */ void $anonfun$testNonPeriodicTask$2(SchedulerTest $this) {
        Assertions.assertEquals((int)$this.counter1().get(), (int)1);
    }

    public static final /* synthetic */ void $anonfun$testNonPeriodicTaskWhenPeriodIsZero$2(SchedulerTest $this) {
        Assertions.assertEquals((int)$this.counter1().get(), (int)1);
    }

    public static final /* synthetic */ void $anonfun$testPeriodicTask$2(SchedulerTest $this) {
        Assertions.assertTrue(($this.counter1().get() >= 20 ? 1 : 0) != 0, (String)"Should count to 20");
    }

    private static final void scheduledTask$1(CountDownLatch taskLatch, CountDownLatch initLatch$1, CountDownLatch completionLatch$1) {
        initLatch$1.countDown();
        Assertions.assertTrue((boolean)taskLatch.await(30L, TimeUnit.SECONDS), (String)"Timed out waiting for latch");
        completionLatch$1.countDown();
    }

    public static final /* synthetic */ void $anonfun$testMockSchedulerLocking$1(List taskLatches$1, CountDownLatch initLatch$1, CountDownLatch completionLatch$1) {
        SchedulerTest.scheduledTask$1((CountDownLatch)taskLatches$1.head(), initLatch$1, completionLatch$1);
    }

    public static final /* synthetic */ void $anonfun$testMockSchedulerLocking$3(List taskLatches$1, CountDownLatch initLatch$1, CountDownLatch completionLatch$1) {
        SchedulerTest.scheduledTask$1((CountDownLatch)taskLatches$1.apply(1), initLatch$1, completionLatch$1);
    }
}

