/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.executiongraph.failover;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.IdentityHashMap;
import java.util.List;
import java.util.Map;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.runtime.executiongraph.Execution;
import org.apache.flink.runtime.executiongraph.ExecutionEdge;
import org.apache.flink.runtime.executiongraph.ExecutionGraph;
import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
import org.apache.flink.runtime.executiongraph.ExecutionVertex;
import org.apache.flink.runtime.executiongraph.IntermediateResult;
import org.apache.flink.runtime.executiongraph.failover.FailoverRegion;
import org.apache.flink.runtime.executiongraph.failover.FailoverStrategy;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.util.FlinkException;
import org.apache.flink.util.FlinkRuntimeException;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class RestartPipelinedRegionStrategy
extends FailoverStrategy {
    private static final Logger LOG = LoggerFactory.getLogger(RestartPipelinedRegionStrategy.class);
    private final ExecutionGraph executionGraph;
    private final HashMap<ExecutionVertex, FailoverRegion> vertexToRegion;

    public RestartPipelinedRegionStrategy(ExecutionGraph executionGraph) {
        this.executionGraph = (ExecutionGraph)Preconditions.checkNotNull((Object)executionGraph);
        this.vertexToRegion = new HashMap();
    }

    @Override
    public void onTaskFailure(Execution taskExecution, Throwable cause) {
        ExecutionVertex ev = taskExecution.getVertex();
        FailoverRegion failoverRegion = this.vertexToRegion.get(ev);
        if (failoverRegion == null) {
            this.executionGraph.failGlobal((Throwable)new FlinkException("Can not find a failover region for the execution " + ev.getTaskNameWithSubtaskIndex(), cause));
        } else {
            LOG.info("Recovering task failure for {} #{} ({}) via restart of failover region", new Object[]{taskExecution.getVertex().getTaskNameWithSubtaskIndex(), taskExecution.getAttemptNumber(), taskExecution.getAttemptId()});
            failoverRegion.onExecutionFail(taskExecution, cause);
        }
    }

    @Override
    public void notifyNewVertices(List<ExecutionJobVertex> newJobVerticesTopological) {
        this.generateAllFailoverRegion(newJobVerticesTopological);
    }

    @Override
    public String getStrategyName() {
        return "Pipelined Region Failover";
    }

    private void generateAllFailoverRegion(List<ExecutionJobVertex> newJobVerticesTopological) {
        IdentityHashMap vertexToRegion = new IdentityHashMap();
        IdentityHashMap distinctRegions = new IdentityHashMap();
        for (ExecutionJobVertex ejv : newJobVerticesTopological) {
            if (ejv.getCoLocationGroup() != null) {
                this.makeAllOneRegion(newJobVerticesTopological);
                return;
            }
            List<IntermediateResult> inputs = ejv.getInputs();
            int numInputs = inputs.size();
            boolean hasPipelinedInputs = false;
            for (IntermediateResult input : inputs) {
                if (!input.getResultType().isPipelined()) continue;
                hasPipelinedInputs = true;
                break;
            }
            if (hasPipelinedInputs) {
                for (ExecutionVertex ev : ejv.getTaskVertices()) {
                    ArrayList thisRegion = null;
                    for (int inputNum = 0; inputNum < numInputs; ++inputNum) {
                        if (!inputs.get(inputNum).getResultType().isPipelined()) continue;
                        for (ExecutionEdge edge : ev.getInputEdges(inputNum)) {
                            ExecutionVertex predecessor = edge.getSource().getProducer();
                            ArrayList predecessorRegion = (ArrayList)vertexToRegion.get(predecessor);
                            if (thisRegion != null) {
                                if (predecessorRegion == thisRegion) continue;
                                predecessorRegion.addAll(thisRegion);
                                distinctRegions.remove(thisRegion);
                                thisRegion = predecessorRegion;
                                for (ExecutionVertex inPredRegion : predecessorRegion) {
                                    vertexToRegion.put(inPredRegion, thisRegion);
                                }
                                continue;
                            }
                            if (predecessor != null) {
                                thisRegion = predecessorRegion;
                                thisRegion.add(ev);
                                vertexToRegion.put(ev, thisRegion);
                                continue;
                            }
                            throw new FlinkRuntimeException("bug in the logic to construct the pipelined failover regions");
                        }
                    }
                }
                continue;
            }
            for (ExecutionVertex ev : ejv.getTaskVertices()) {
                ArrayList<ExecutionVertex> region = new ArrayList<ExecutionVertex>(1);
                region.add(ev);
                vertexToRegion.put(ev, region);
                distinctRegions.put(region, null);
            }
        }
        LOG.info("Creating {} individual failover regions for job {} ({})", new Object[]{distinctRegions.size(), this.executionGraph.getJobName(), this.executionGraph.getJobID()});
        for (List region : distinctRegions.keySet()) {
            FailoverRegion failoverRegion = this.createFailoverRegion(this.executionGraph, region);
            for (ExecutionVertex ev : region) {
                this.vertexToRegion.put(ev, failoverRegion);
            }
        }
    }

    private void makeAllOneRegion(List<ExecutionJobVertex> jobVertices) {
        LOG.warn("Cannot decompose ExecutionGraph into individual failover regions due to use of Co-Location constraints (iterations). Job will fail over as one holistic unit.");
        ArrayList<ExecutionVertex> allVertices = new ArrayList<ExecutionVertex>();
        for (ExecutionJobVertex ejv : jobVertices) {
            allVertices.ensureCapacity(allVertices.size() + ejv.getParallelism());
            allVertices.addAll(Arrays.asList(ejv.getTaskVertices()));
        }
        FailoverRegion singleRegion = this.createFailoverRegion(this.executionGraph, allVertices);
        for (ExecutionVertex ev : allVertices) {
            this.vertexToRegion.put(ev, singleRegion);
        }
    }

    @VisibleForTesting
    protected FailoverRegion createFailoverRegion(ExecutionGraph eg, List<ExecutionVertex> connectedExecutions) {
        Map<JobVertexID, ExecutionJobVertex> tasks = this.initTasks(connectedExecutions);
        return new FailoverRegion(eg, connectedExecutions, tasks);
    }

    @VisibleForTesting
    protected Map<JobVertexID, ExecutionJobVertex> initTasks(List<ExecutionVertex> connectedExecutions) {
        HashMap<JobVertexID, ExecutionJobVertex> tasks = new HashMap<JobVertexID, ExecutionJobVertex>(connectedExecutions.size());
        for (ExecutionVertex executionVertex : connectedExecutions) {
            JobVertexID jobvertexId = executionVertex.getJobvertexId();
            ExecutionJobVertex jobVertex = executionVertex.getJobVertex();
            tasks.putIfAbsent(jobvertexId, jobVertex);
        }
        return tasks;
    }

    @VisibleForTesting
    public FailoverRegion getFailoverRegion(ExecutionVertex ev) {
        return this.vertexToRegion.get(ev);
    }

    public static class Factory
    implements FailoverStrategy.Factory {
        @Override
        public FailoverStrategy create(ExecutionGraph executionGraph) {
            return new RestartPipelinedRegionStrategy(executionGraph);
        }
    }
}

