/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.executiongraph.failover;

import java.util.List;
import java.util.concurrent.CompletableFuture;
import org.apache.flink.metrics.Counter;
import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.metrics.SimpleCounter;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.executiongraph.Execution;
import org.apache.flink.runtime.executiongraph.ExecutionGraph;
import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
import org.apache.flink.runtime.executiongraph.ExecutionVertex;
import org.apache.flink.runtime.executiongraph.GlobalModVersionMismatch;
import org.apache.flink.runtime.executiongraph.IntermediateResult;
import org.apache.flink.runtime.executiongraph.failover.FailoverStrategy;
import org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException;
import org.apache.flink.util.FlinkRuntimeException;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class RestartIndividualStrategy
extends FailoverStrategy {
    private static final Logger LOG = LoggerFactory.getLogger(RestartIndividualStrategy.class);
    private final ExecutionGraph executionGraph;
    private final SimpleCounter numTaskFailures;

    public RestartIndividualStrategy(ExecutionGraph executionGraph) {
        this.executionGraph = (ExecutionGraph)Preconditions.checkNotNull((Object)executionGraph);
        this.numTaskFailures = new SimpleCounter();
    }

    @Override
    public void onTaskFailure(Execution taskExecution, Throwable cause) {
        this.executionGraph.getJobMasterMainThreadExecutor().assertRunningInMainThread();
        if (cause instanceof NoResourceAvailableException) {
            LOG.info("Not enough resources to schedule {} - triggering full recovery.", (Object)taskExecution);
            this.executionGraph.failGlobal(cause);
            return;
        }
        LOG.info("Recovering task failure for {} (#{}) via individual restart.", (Object)taskExecution.getVertex().getTaskNameWithSubtaskIndex(), (Object)taskExecution.getAttemptNumber());
        this.numTaskFailures.inc();
        CompletableFuture<ExecutionState> terminationFuture = taskExecution.getTerminalStateFuture();
        terminationFuture.thenRun(() -> this.performExecutionVertexRestart(taskExecution.getVertex(), taskExecution.getGlobalModVersion()));
    }

    protected void performExecutionVertexRestart(ExecutionVertex vertexToRecover, long globalModVersion) {
        try {
            long createTimestamp = System.currentTimeMillis();
            Execution newExecution = vertexToRecover.resetForNewExecution(createTimestamp, globalModVersion);
            newExecution.scheduleForExecution();
        }
        catch (GlobalModVersionMismatch globalModVersionMismatch) {
        }
        catch (Exception e) {
            this.executionGraph.failGlobal(new Exception("Error during fine grained recovery - triggering full recovery", e));
        }
    }

    @Override
    public void notifyNewVertices(List<ExecutionJobVertex> newJobVerticesTopological) {
        for (ExecutionJobVertex ejv : newJobVerticesTopological) {
            List<IntermediateResult> inputs = ejv.getInputs();
            IntermediateResult[] outputs = ejv.getProducedDataSets();
            if ((inputs == null || inputs.size() <= 0) && (outputs == null || outputs.length <= 0)) continue;
            throw new FlinkRuntimeException("Incompatible failover strategy - strategy '" + this.getStrategyName() + "' can only handle jobs with only disconnected tasks.");
        }
    }

    @Override
    public String getStrategyName() {
        return "Individual Task Restart";
    }

    @Override
    public void registerMetrics(MetricGroup metricGroup) {
        metricGroup.counter("task_failures", (Counter)this.numTaskFailures);
    }

    public static class Factory
    implements FailoverStrategy.Factory {
        @Override
        public RestartIndividualStrategy create(ExecutionGraph executionGraph) {
            return new RestartIndividualStrategy(executionGraph);
        }
    }
}

