/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.scheduler.strategy;

import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
import java.util.function.Predicate;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.scheduler.DeploymentOption;
import org.apache.flink.runtime.scheduler.ExecutionVertexDeploymentOption;
import org.apache.flink.runtime.scheduler.SchedulerOperations;
import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
import org.apache.flink.runtime.scheduler.strategy.InputDependencyConstraintChecker;
import org.apache.flink.runtime.scheduler.strategy.SchedulingExecutionVertex;
import org.apache.flink.runtime.scheduler.strategy.SchedulingResultPartition;
import org.apache.flink.runtime.scheduler.strategy.SchedulingStrategy;
import org.apache.flink.runtime.scheduler.strategy.SchedulingStrategyFactory;
import org.apache.flink.runtime.scheduler.strategy.SchedulingTopology;
import org.apache.flink.util.Preconditions;

public class LazyFromSourcesSchedulingStrategy
implements SchedulingStrategy {
    private static final Predicate<SchedulingExecutionVertex> IS_IN_CREATED_EXECUTION_STATE = schedulingExecutionVertex -> ExecutionState.CREATED == schedulingExecutionVertex.getState();
    private final SchedulerOperations schedulerOperations;
    private final SchedulingTopology schedulingTopology;
    private final Map<ExecutionVertexID, DeploymentOption> deploymentOptions;
    private final InputDependencyConstraintChecker inputConstraintChecker;

    public LazyFromSourcesSchedulingStrategy(SchedulerOperations schedulerOperations, SchedulingTopology schedulingTopology) {
        this.schedulerOperations = (SchedulerOperations)Preconditions.checkNotNull((Object)schedulerOperations);
        this.schedulingTopology = (SchedulingTopology)Preconditions.checkNotNull((Object)schedulingTopology);
        this.deploymentOptions = new HashMap<ExecutionVertexID, DeploymentOption>();
        this.inputConstraintChecker = new InputDependencyConstraintChecker();
    }

    @Override
    public void startScheduling() {
        DeploymentOption updateOption = new DeploymentOption(true);
        DeploymentOption nonUpdateOption = new DeploymentOption(false);
        for (SchedulingExecutionVertex schedulingVertex : this.schedulingTopology.getVertices()) {
            DeploymentOption option = nonUpdateOption;
            for (SchedulingResultPartition srp : schedulingVertex.getProducedResultPartitions()) {
                if (srp.getPartitionType().isPipelined()) {
                    option = updateOption;
                }
                this.inputConstraintChecker.addSchedulingResultPartition(srp);
            }
            this.deploymentOptions.put(schedulingVertex.getId(), option);
        }
        this.allocateSlotsAndDeployExecutionVertexIds(this.getAllVerticesFromTopology());
    }

    @Override
    public void restartTasks(Set<ExecutionVertexID> verticesToRestart) {
        verticesToRestart.stream().map(this.schedulingTopology::getVertexOrThrow).flatMap(vertex -> vertex.getProducedResultPartitions().stream()).forEach(this.inputConstraintChecker::resetSchedulingResultPartition);
        this.allocateSlotsAndDeployExecutionVertexIds(verticesToRestart);
    }

    @Override
    public void onExecutionStateChange(ExecutionVertexID executionVertexId, ExecutionState executionState) {
        if (!ExecutionState.FINISHED.equals((Object)executionState)) {
            return;
        }
        Set<SchedulingExecutionVertex> verticesToSchedule = this.schedulingTopology.getVertexOrThrow(executionVertexId).getProducedResultPartitions().stream().flatMap(partition -> this.inputConstraintChecker.markSchedulingResultPartitionFinished((SchedulingResultPartition)partition).stream()).flatMap(partition -> partition.getConsumers().stream()).collect(Collectors.toSet());
        this.allocateSlotsAndDeployExecutionVertices(verticesToSchedule);
    }

    @Override
    public void onPartitionConsumable(ExecutionVertexID executionVertexId, ResultPartitionID resultPartitionId) {
        SchedulingResultPartition resultPartition = this.schedulingTopology.getResultPartitionOrThrow(resultPartitionId.getPartitionId());
        if (!resultPartition.getPartitionType().isPipelined()) {
            return;
        }
        SchedulingExecutionVertex producerVertex = this.schedulingTopology.getVertexOrThrow(executionVertexId);
        if (!producerVertex.getProducedResultPartitions().contains(resultPartition)) {
            throw new IllegalStateException("partition " + resultPartitionId + " is not the produced partition of " + executionVertexId);
        }
        this.allocateSlotsAndDeployExecutionVertices(resultPartition.getConsumers());
    }

    private void allocateSlotsAndDeployExecutionVertexIds(Set<ExecutionVertexID> verticesToSchedule) {
        this.allocateSlotsAndDeployExecutionVertices(verticesToSchedule.stream().map(this.schedulingTopology::getVertexOrThrow).collect(Collectors.toList()));
    }

    private void allocateSlotsAndDeployExecutionVertices(Collection<SchedulingExecutionVertex> schedulingExecutionVertices) {
        this.schedulerOperations.allocateSlotsAndDeploy(schedulingExecutionVertices.stream().filter(this.isInputConstraintSatisfied().and(IS_IN_CREATED_EXECUTION_STATE)).map(SchedulingExecutionVertex::getId).map(executionVertexID -> new ExecutionVertexDeploymentOption((ExecutionVertexID)executionVertexID, this.deploymentOptions.get(executionVertexID))).collect(Collectors.toSet()));
    }

    private Predicate<SchedulingExecutionVertex> isInputConstraintSatisfied() {
        return this.inputConstraintChecker::check;
    }

    private Set<ExecutionVertexID> getAllVerticesFromTopology() {
        return StreamSupport.stream(this.schedulingTopology.getVertices().spliterator(), false).map(SchedulingExecutionVertex::getId).collect(Collectors.toSet());
    }

    public static class Factory
    implements SchedulingStrategyFactory {
        @Override
        public SchedulingStrategy createInstance(SchedulerOperations schedulerOperations, SchedulingTopology schedulingTopology, JobGraph jobGraph) {
            return new LazyFromSourcesSchedulingStrategy(schedulerOperations, schedulingTopology);
        }
    }
}

