/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.scheduler.adaptive;

import java.time.Duration;
import java.util.Comparator;
import java.util.PriorityQueue;
import java.util.Queue;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Consumer;
import java.util.function.Supplier;
import javax.annotation.Nullable;
import org.apache.flink.api.common.JobStatus;
import org.apache.flink.core.testutils.ScheduledTask;
import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
import org.apache.flink.runtime.executiongraph.ErrorInfo;
import org.apache.flink.runtime.executiongraph.ExecutionGraph;
import org.apache.flink.runtime.failure.FailureEnricherUtils;
import org.apache.flink.runtime.rest.handler.legacy.utils.ArchivedExecutionGraphBuilder;
import org.apache.flink.runtime.scheduler.adaptive.AdaptiveSchedulerTest;
import org.apache.flink.runtime.scheduler.adaptive.State;
import org.apache.flink.runtime.scheduler.adaptive.StateValidator;
import org.apache.flink.runtime.scheduler.adaptive.WaitingForResources;
import org.apache.flink.util.TestLogger;
import org.apache.flink.util.clock.Clock;
import org.apache.flink.util.clock.ManualClock;
import org.hamcrest.CoreMatchers;
import org.hamcrest.Matcher;
import org.hamcrest.Matchers;
import org.junit.Assert;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class WaitingForResourcesTest
extends TestLogger {
    private static final Duration STABILIZATION_TIMEOUT = Duration.ofSeconds(1L);

    @Test
    public void testTransitionToCreatingExecutionGraph() throws Exception {
        try (MockContext ctx = new MockContext();){
            ctx.setHasDesiredResources(() -> true);
            ctx.setExpectCreatingExecutionGraph();
            new WaitingForResources((WaitingForResources.Context)ctx, this.log, Duration.ZERO, STABILIZATION_TIMEOUT);
            ctx.runScheduledTasks();
        }
    }

    @Test
    public void testNotEnoughResources() throws Exception {
        try (MockContext ctx = new MockContext();){
            ctx.setHasDesiredResources(() -> false);
            WaitingForResources wfr = new WaitingForResources((WaitingForResources.Context)ctx, this.log, Duration.ZERO, STABILIZATION_TIMEOUT);
            wfr.onNewResourcesAvailable();
        }
    }

    @Test
    public void testNotifyNewResourcesAvailable() throws Exception {
        try (MockContext ctx = new MockContext();){
            ctx.setHasDesiredResources(() -> false);
            WaitingForResources wfr = new WaitingForResources((WaitingForResources.Context)ctx, this.log, Duration.ZERO, STABILIZATION_TIMEOUT);
            ctx.setHasDesiredResources(() -> true);
            ctx.setExpectCreatingExecutionGraph();
            wfr.onNewResourcesAvailable();
        }
    }

    @Test
    public void testSchedulingWithSufficientResourcesAndNoStabilizationTimeout() throws Exception {
        try (MockContext ctx = new MockContext();){
            Duration noStabilizationTimeout = Duration.ofMillis(0L);
            WaitingForResources wfr = new WaitingForResources((WaitingForResources.Context)ctx, this.log, Duration.ofSeconds(1000L), noStabilizationTimeout);
            ctx.setHasDesiredResources(() -> false);
            ctx.setHasSufficientResources(() -> true);
            ctx.setExpectCreatingExecutionGraph();
            wfr.onNewResourcesAvailable();
        }
    }

    @Test
    public void testNoSchedulingIfStabilizationTimeoutIsConfigured() throws Exception {
        try (MockContext ctx = new MockContext();){
            Duration stabilizationTimeout = Duration.ofMillis(50000L);
            WaitingForResources wfr = new WaitingForResources((WaitingForResources.Context)ctx, this.log, Duration.ofSeconds(1000L), stabilizationTimeout);
            ctx.setHasDesiredResources(() -> false);
            ctx.setHasSufficientResources(() -> true);
            wfr.onNewResourcesAvailable();
            Assert.assertThat((Object)ctx.hasStateTransition(), (Matcher)CoreMatchers.is((Object)false));
        }
    }

    @Test
    public void testSchedulingWithSufficientResourcesAfterStabilizationTimeout() throws Exception {
        try (MockContext ctx = new MockContext();){
            Duration initialResourceTimeout = Duration.ofMillis(-1L);
            Duration stabilizationTimeout = Duration.ofMillis(50000L);
            WaitingForResources wfr = new WaitingForResources((WaitingForResources.Context)ctx, this.log, initialResourceTimeout, stabilizationTimeout, ctx.getClock(), null);
            ctx.setHasDesiredResources(() -> false);
            ctx.setHasSufficientResources(() -> true);
            wfr.onNewResourcesAvailable();
            ctx.setExpectCreatingExecutionGraph();
            Duration afterStabilizationTimeout = stabilizationTimeout.plusMillis(1L);
            ctx.advanceTimeByMillis(afterStabilizationTimeout.toMillis());
            ctx.runScheduledTasks(afterStabilizationTimeout.toMillis());
            Assert.assertThat((Object)ctx.hasStateTransition(), (Matcher)CoreMatchers.is((Object)true));
        }
    }

    @Test
    public void testStabilizationTimeoutReset() throws Exception {
        try (MockContext ctx = new MockContext();){
            Duration initialResourceTimeout = Duration.ofMillis(-1L);
            Duration stabilizationTimeout = Duration.ofMillis(50L);
            WaitingForResources wfr = new WaitingForResources((WaitingForResources.Context)ctx, this.log, initialResourceTimeout, stabilizationTimeout, ctx.getClock(), null);
            ctx.setHasDesiredResources(() -> false);
            ctx.setHasSufficientResources(() -> true);
            ctx.advanceTimeByMillis(40L);
            wfr.onNewResourcesAvailable();
            ctx.setHasSufficientResources(() -> false);
            ctx.advanceTimeByMillis(40L);
            wfr.onNewResourcesAvailable();
            ctx.setHasSufficientResources(() -> true);
            ctx.advanceTimeByMillis(40L);
            wfr.onNewResourcesAvailable();
            Assert.assertThat((Object)ctx.hasStateTransition(), (Matcher)CoreMatchers.is((Object)false));
            Assert.assertThat((Object)ctx.getTestDuration(), (Matcher)Matchers.greaterThan((Comparable)stabilizationTimeout));
            ctx.setExpectCreatingExecutionGraph();
            ctx.advanceTimeByMillis(1L);
            Assert.assertThat((Object)ctx.hasStateTransition(), (Matcher)CoreMatchers.is((Object)false));
            ctx.advanceTimeByMillis(stabilizationTimeout.toMillis());
            Assert.assertThat((Object)ctx.hasStateTransition(), (Matcher)CoreMatchers.is((Object)true));
        }
    }

    @Test
    public void testNoStateTransitionOnNoResourceTimeout() throws Exception {
        try (MockContext ctx = new MockContext();){
            ctx.setHasDesiredResources(() -> false);
            WaitingForResources wfr = new WaitingForResources((WaitingForResources.Context)ctx, this.log, Duration.ofMillis(-1L), STABILIZATION_TIMEOUT);
            ctx.runScheduledTasks();
            Assert.assertThat((Object)ctx.hasStateTransition(), (Matcher)CoreMatchers.is((Object)false));
        }
    }

    @Test
    public void testStateTransitionOnResourceTimeout() throws Exception {
        try (MockContext ctx = new MockContext();){
            ctx.setHasDesiredResources(() -> false);
            WaitingForResources wfr = new WaitingForResources((WaitingForResources.Context)ctx, this.log, Duration.ZERO, STABILIZATION_TIMEOUT);
            ctx.setExpectCreatingExecutionGraph();
            ctx.runScheduledTasks();
        }
    }

    @Test
    public void testTransitionToFinishedOnGlobalFailure() throws Exception {
        String testExceptionString = "This is a test exception";
        try (MockContext ctx = new MockContext();){
            ctx.setHasDesiredResources(() -> false);
            WaitingForResources wfr = new WaitingForResources((WaitingForResources.Context)ctx, this.log, Duration.ZERO, STABILIZATION_TIMEOUT);
            ctx.setExpectFinished(archivedExecutionGraph -> {
                Assert.assertThat((Object)archivedExecutionGraph.getState(), (Matcher)CoreMatchers.is((Object)JobStatus.FAILED));
                Assert.assertThat((Object)archivedExecutionGraph.getFailureInfo(), (Matcher)CoreMatchers.notNullValue());
                Assert.assertTrue((boolean)archivedExecutionGraph.getFailureInfo().getExceptionAsString().contains("This is a test exception"));
            });
            wfr.handleGlobalFailure((Throwable)new RuntimeException("This is a test exception"), FailureEnricherUtils.EMPTY_FAILURE_LABELS);
        }
    }

    @Test
    public void testCancel() throws Exception {
        try (MockContext ctx = new MockContext();){
            ctx.setHasDesiredResources(() -> false);
            WaitingForResources wfr = new WaitingForResources((WaitingForResources.Context)ctx, this.log, Duration.ZERO, STABILIZATION_TIMEOUT);
            ctx.setExpectFinished(archivedExecutionGraph -> Assert.assertThat((Object)archivedExecutionGraph.getState(), (Matcher)CoreMatchers.is((Object)JobStatus.CANCELED)));
            wfr.cancel();
        }
    }

    @Test
    public void testSuspend() throws Exception {
        try (MockContext ctx = new MockContext();){
            ctx.setHasDesiredResources(() -> false);
            WaitingForResources wfr = new WaitingForResources((WaitingForResources.Context)ctx, this.log, Duration.ZERO, STABILIZATION_TIMEOUT);
            ctx.setExpectFinished(archivedExecutionGraph -> {
                Assert.assertThat((Object)archivedExecutionGraph.getState(), (Matcher)CoreMatchers.is((Object)JobStatus.SUSPENDED));
                Assert.assertThat((Object)archivedExecutionGraph.getFailureInfo(), (Matcher)CoreMatchers.notNullValue());
            });
            wfr.suspend((Throwable)new RuntimeException("suspend"));
        }
    }

    @Test
    public void testInternalRunScheduledTasks_correctExecutionOrder() {
        MockContext ctx = new MockContext();
        AtomicBoolean firstRun = new AtomicBoolean(false);
        AtomicBoolean secondRun = new AtomicBoolean(false);
        AtomicBoolean thirdRun = new AtomicBoolean(false);
        Runnable runFirstBecauseOfLowDelay = () -> firstRun.set(true);
        Runnable runSecondBecauseOfScheduleOrder = () -> {
            if (!firstRun.get()) {
                Assert.fail((String)"order violated");
            }
            secondRun.set(true);
        };
        Runnable runLastBecauseOfHighDelay = () -> {
            if (!secondRun.get()) {
                Assert.fail((String)"order violated");
            }
            thirdRun.set(true);
        };
        ctx.runIfState(new AdaptiveSchedulerTest.DummyState(), runLastBecauseOfHighDelay, Duration.ofMillis(999L));
        ctx.runIfState(new AdaptiveSchedulerTest.DummyState(), runFirstBecauseOfLowDelay, Duration.ZERO);
        ctx.runIfState(new AdaptiveSchedulerTest.DummyState(), runSecondBecauseOfScheduleOrder, Duration.ZERO);
        ctx.runScheduledTasks();
        Assert.assertThat((Object)thirdRun.get(), (Matcher)CoreMatchers.is((Object)true));
    }

    @Test
    public void testInternalRunScheduledTasks_tasksAreRemovedAfterExecution() {
        MockContext ctx = new MockContext();
        AtomicBoolean executed = new AtomicBoolean(false);
        Runnable executeOnce = () -> {
            if (executed.get()) {
                Assert.fail((String)"Multiple executions");
            }
            executed.set(true);
        };
        ctx.runIfState(new AdaptiveSchedulerTest.DummyState(), executeOnce, Duration.ZERO);
        ctx.runScheduledTasks();
        ctx.runScheduledTasks();
    }

    @Test
    public void testInternalRunScheduledTasks_upperBoundRespected() {
        MockContext ctx = new MockContext();
        Runnable executeNever = () -> Assert.fail((String)"Not expected");
        ctx.runIfState(new AdaptiveSchedulerTest.DummyState(), executeNever, Duration.ofMillis(10L));
        ctx.runScheduledTasks(4L);
    }

    @Test
    public void testInternalRunScheduledTasks_scheduleTaskFromRunnable() {
        MockContext ctx = new MockContext();
        AdaptiveSchedulerTest.DummyState state = new AdaptiveSchedulerTest.DummyState();
        AtomicBoolean executed = new AtomicBoolean(false);
        ctx.runIfState(state, () -> ctx.runIfState(state, () -> executed.set(true), Duration.ofMillis(4L)), Duration.ZERO);
        ctx.runScheduledTasks(10L);
        Assert.assertThat((Object)executed.get(), (Matcher)CoreMatchers.is((Object)true));
    }

    static <T> Consumer<T> assertNonNull() {
        return item -> Assert.assertThat((Object)item, (Matcher)CoreMatchers.notNullValue());
    }

    private static final class ManualTestTime {
        private static final Logger LOG = LoggerFactory.getLogger(ManualTestTime.class);
        private final ManualClock testingClock = new ManualClock();
        private final Consumer<Duration> runOnAdvance;
        private Duration durationSinceTestStart = Duration.ZERO;

        private ManualTestTime(Consumer<Duration> runOnAdvance) {
            this.runOnAdvance = runOnAdvance;
        }

        private Clock getClock() {
            return this.testingClock;
        }

        public void advanceMillis(long millis) {
            this.durationSinceTestStart = this.durationSinceTestStart.plusMillis(millis);
            LOG.info("Advance testing time by {} ms to time {} ms", (Object)millis, (Object)this.durationSinceTestStart.toMillis());
            this.testingClock.advanceTime(millis, TimeUnit.MILLISECONDS);
            this.runOnAdvance.accept(this.durationSinceTestStart);
        }

        public Duration getTestDuration() {
            return this.durationSinceTestStart;
        }
    }

    private static class MockContext
    implements WaitingForResources.Context,
    AutoCloseable {
        private static final Logger LOG = LoggerFactory.getLogger(MockContext.class);
        private final StateValidator<Void> creatingExecutionGraphStateValidator = new StateValidator("executing");
        private final StateValidator<ArchivedExecutionGraph> finishedStateValidator = new StateValidator("finished");
        private Supplier<Boolean> hasDesiredResourcesSupplier = () -> false;
        private Supplier<Boolean> hasSufficientResourcesSupplier = () -> false;
        private final Queue<ScheduledTask<Void>> scheduledTasks = new PriorityQueue<ScheduledTask>(Comparator.comparingLong(o -> o.getDelay(TimeUnit.MILLISECONDS)));
        private boolean hasStateTransition = false;
        private final ManualTestTime testTime = new ManualTestTime(durationSinceTestStart -> this.runScheduledTasks(durationSinceTestStart.toMillis()));

        private MockContext() {
        }

        public void setHasDesiredResources(Supplier<Boolean> sup) {
            this.hasDesiredResourcesSupplier = sup;
        }

        public void setHasSufficientResources(Supplier<Boolean> sup) {
            this.hasSufficientResourcesSupplier = sup;
        }

        void setExpectFinished(Consumer<ArchivedExecutionGraph> asserter) {
            this.finishedStateValidator.expectInput(asserter);
        }

        void setExpectCreatingExecutionGraph() {
            this.creatingExecutionGraphStateValidator.expectInput(none -> {});
        }

        void runScheduledTasks(long untilDelay) {
            LOG.info("Running scheduled tasks with a delay between 0 and {}ms:", (Object)untilDelay);
            while (this.scheduledTasks.peek() != null && this.scheduledTasks.peek().getDelay(TimeUnit.MILLISECONDS) <= untilDelay) {
                ScheduledTask<Void> scheduledTask = this.scheduledTasks.poll();
                LOG.info("Running task with delay {}", (Object)scheduledTask.getDelay(TimeUnit.MILLISECONDS));
                scheduledTask.execute();
                if (!scheduledTask.isPeriodic()) continue;
                this.scheduledTasks.add(scheduledTask);
            }
        }

        void runScheduledTasks() {
            this.runScheduledTasks(Long.MAX_VALUE);
        }

        @Override
        public void close() throws Exception {
            this.creatingExecutionGraphStateValidator.close();
            this.finishedStateValidator.close();
        }

        public ArchivedExecutionGraph getArchivedExecutionGraph(JobStatus jobStatus, @Nullable Throwable cause) {
            return new ArchivedExecutionGraphBuilder().setState(jobStatus).setFailureCause(cause == null ? null : new ErrorInfo(cause, 1337L)).build();
        }

        public boolean hasDesiredResources() {
            return this.hasDesiredResourcesSupplier.get();
        }

        public boolean hasSufficientResources() {
            return this.hasSufficientResourcesSupplier.get();
        }

        public ScheduledFuture<?> runIfState(State expectedState, Runnable action, Duration delay) {
            LOG.info("Scheduling work with delay {} for earliest execution at {}", (Object)delay.toMillis(), (Object)(this.testTime.getClock().absoluteTimeMillis() + delay.toMillis()));
            ScheduledTask scheduledTask = new ScheduledTask(() -> {
                if (!this.hasStateTransition) {
                    action.run();
                }
                return null;
            }, this.testTime.getClock().absoluteTimeMillis() + delay.toMillis());
            this.scheduledTasks.add((ScheduledTask<Void>)scheduledTask);
            return scheduledTask;
        }

        public void goToFinished(ArchivedExecutionGraph archivedExecutionGraph) {
            this.finishedStateValidator.validateInput(archivedExecutionGraph);
            this.hasStateTransition = true;
        }

        public void goToCreatingExecutionGraph(@Nullable ExecutionGraph previousExecutionGraph) {
            this.creatingExecutionGraphStateValidator.validateInput(null);
            this.hasStateTransition = true;
        }

        public boolean hasStateTransition() {
            return this.hasStateTransition;
        }

        public Clock getClock() {
            return this.testTime.getClock();
        }

        public void advanceTimeByMillis(long millis) {
            this.testTime.advanceMillis(millis);
        }

        public Duration getTestDuration() {
            return this.testTime.getTestDuration();
        }
    }
}

