/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.executiongraph;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ScheduledExecutorService;
import java.util.function.Predicate;
import org.apache.flink.api.common.JobStatus;
import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutorServiceAdapter;
import org.apache.flink.runtime.concurrent.ManuallyTriggeredScheduledExecutorService;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.executiongraph.AccessExecution;
import org.apache.flink.runtime.executiongraph.ExecutionGraph;
import org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils;
import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
import org.apache.flink.runtime.executiongraph.ExecutionVertex;
import org.apache.flink.runtime.executiongraph.failover.flip1.FixedDelayRestartBackoffTimeStrategy;
import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
import org.apache.flink.runtime.jobgraph.DistributionPattern;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobGraphTestUtils;
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
import org.apache.flink.runtime.scheduler.DefaultScheduler;
import org.apache.flink.runtime.scheduler.DefaultSchedulerBuilder;
import org.apache.flink.runtime.scheduler.ExecutionSlotAllocatorFactory;
import org.apache.flink.runtime.scheduler.SchedulerTestingUtils;
import org.apache.flink.runtime.scheduler.TestingPhysicalSlot;
import org.apache.flink.runtime.scheduler.TestingPhysicalSlotProvider;
import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
import org.apache.flink.testutils.TestingUtils;
import org.apache.flink.testutils.executor.TestExecutorResource;
import org.apache.flink.util.FlinkException;
import org.hamcrest.Matcher;
import org.hamcrest.Matchers;
import org.junit.Assert;
import org.junit.ClassRule;
import org.junit.Test;

public class ExecutionGraphCoLocationRestartTest {
    @ClassRule
    public static final TestExecutorResource<ScheduledExecutorService> EXECUTOR_RESOURCE = TestingUtils.defaultExecutorResource();
    private static final int NUM_TASKS = 31;

    @Test
    public void testConstraintsAfterRestart() throws Exception {
        long timeout = 5000L;
        JobVertex groupVertex = ExecutionGraphTestUtils.createNoOpVertex(31);
        JobVertex groupVertex2 = ExecutionGraphTestUtils.createNoOpVertex(31);
        groupVertex2.connectNewDataSetAsInput(groupVertex, DistributionPattern.POINTWISE, ResultPartitionType.PIPELINED);
        SlotSharingGroup sharingGroup = new SlotSharingGroup();
        groupVertex.setSlotSharingGroup(sharingGroup);
        groupVertex2.setSlotSharingGroup(sharingGroup);
        groupVertex.setStrictlyCoLocatedWith(groupVertex2);
        JobGraph jobGraph = JobGraphTestUtils.streamingJobGraph(groupVertex, groupVertex2);
        ManuallyTriggeredScheduledExecutorService delayExecutor = new ManuallyTriggeredScheduledExecutorService();
        DefaultScheduler scheduler = new DefaultSchedulerBuilder(jobGraph, ComponentMainThreadExecutorServiceAdapter.forMainThread(), (ScheduledExecutorService)EXECUTOR_RESOURCE.getExecutor()).setExecutionSlotAllocatorFactory((ExecutionSlotAllocatorFactory)SchedulerTestingUtils.newSlotSharingExecutionSlotAllocatorFactory(TestingPhysicalSlotProvider.create(ignored -> CompletableFuture.completedFuture(TestingPhysicalSlot.builder().build())))).setDelayExecutor(delayExecutor).setRestartBackoffTimeStrategy(new FixedDelayRestartBackoffTimeStrategy.FixedDelayRestartBackoffTimeStrategyFactory(1, 0L).create()).build();
        ExecutionGraph eg = scheduler.getExecutionGraph();
        Assert.assertEquals((Object)JobStatus.CREATED, (Object)eg.getState());
        scheduler.startScheduling();
        Predicate<AccessExecution> isDeploying = ExecutionGraphTestUtils.isInExecutionState(ExecutionState.DEPLOYING);
        ExecutionGraphTestUtils.waitForAllExecutionsPredicate(eg, isDeploying, 5000L);
        Assert.assertEquals((Object)JobStatus.RUNNING, (Object)eg.getState());
        this.validateConstraints(eg);
        ((ExecutionVertex)eg.getAllExecutionVertices().iterator().next()).fail((Throwable)new FlinkException("Test exception"));
        Assert.assertEquals((Object)JobStatus.RESTARTING, (Object)eg.getState());
        delayExecutor.triggerNonPeriodicScheduledTask();
        for (ExecutionVertex vertex : eg.getAllExecutionVertices()) {
            if (vertex.getExecutionState() != ExecutionState.CANCELING) continue;
            vertex.getCurrentExecutionAttempt().completeCancelling();
        }
        ExecutionGraphTestUtils.waitUntilJobStatus(eg, JobStatus.RUNNING, 5000L);
        ExecutionGraphTestUtils.waitForAllExecutionsPredicate(eg, isDeploying, 5000L);
        this.validateConstraints(eg);
        ExecutionGraphTestUtils.finishAllVertices(eg);
        Assert.assertThat((Object)eg.getState(), (Matcher)Matchers.is((Object)JobStatus.FINISHED));
    }

    private void validateConstraints(ExecutionGraph eg) {
        ExecutionJobVertex[] tasks = eg.getAllVertices().values().toArray(new ExecutionJobVertex[2]);
        for (int i = 0; i < 31; ++i) {
            TaskManagerLocation taskManagerLocation0 = tasks[0].getTaskVertices()[i].getCurrentAssignedResourceLocation();
            TaskManagerLocation taskManagerLocation1 = tasks[1].getTaskVertices()[i].getCurrentAssignedResourceLocation();
            Assert.assertThat((Object)taskManagerLocation0, (Matcher)Matchers.is((Object)taskManagerLocation1));
        }
    }
}

