/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.checkpoint;

import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.HashMap;
import java.util.Map;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import org.apache.flink.api.common.JobID;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.fs.FSDataInputStream;
import org.apache.flink.runtime.checkpoint.CheckpointProperties;
import org.apache.flink.runtime.checkpoint.CompletedCheckpoint;
import org.apache.flink.runtime.checkpoint.OperatorState;
import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
import org.apache.flink.runtime.checkpoint.savepoint.Savepoint;
import org.apache.flink.runtime.checkpoint.savepoint.SavepointSerializer;
import org.apache.flink.runtime.checkpoint.savepoint.SavepointSerializers;
import org.apache.flink.runtime.checkpoint.savepoint.SavepointV2;
import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.runtime.state.CompletedCheckpointStorageLocation;
import org.apache.flink.runtime.state.StateBackend;
import org.apache.flink.runtime.state.StateBackendLoader;
import org.apache.flink.runtime.state.StreamStateHandle;
import org.apache.flink.runtime.state.memory.MemoryStateBackend;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.FlinkException;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class Checkpoints {
    private static final Logger LOG = LoggerFactory.getLogger(Checkpoints.class);
    public static final int HEADER_MAGIC_NUMBER = 1231054637;

    public static <T extends Savepoint> void storeCheckpointMetadata(T checkpointMetadata, OutputStream out) throws IOException {
        DataOutputStream dos = new DataOutputStream(out);
        Checkpoints.storeCheckpointMetadata(checkpointMetadata, dos);
    }

    public static <T extends Savepoint> void storeCheckpointMetadata(T checkpointMetadata, DataOutputStream out) throws IOException {
        out.writeInt(1231054637);
        out.writeInt(checkpointMetadata.getVersion());
        SavepointSerializer<T> serializer = SavepointSerializers.getSerializer(checkpointMetadata);
        serializer.serialize(checkpointMetadata, out);
    }

    public static Savepoint loadCheckpointMetadata(DataInputStream in, ClassLoader classLoader) throws IOException {
        Preconditions.checkNotNull((Object)in, (String)"input stream");
        Preconditions.checkNotNull((Object)classLoader, (String)"classLoader");
        int magicNumber = in.readInt();
        if (magicNumber == 1231054637) {
            int version = in.readInt();
            SavepointSerializer<?> serializer = SavepointSerializers.getSerializer(version);
            if (serializer != null) {
                return serializer.deserialize(in, classLoader);
            }
            throw new IOException("Unrecognized checkpoint version number: " + version);
        }
        throw new IOException("Unexpected magic number. This can have multiple reasons: (1) You are trying to load a Flink 1.0 savepoint, which is not supported by this version of Flink. (2) The file you were pointing to is not a savepoint at all. (3) The savepoint file has been corrupted.");
    }

    public static CompletedCheckpoint loadAndValidateCheckpoint(JobID jobId, Map<JobVertexID, ExecutionJobVertex> tasks, CompletedCheckpointStorageLocation location, ClassLoader classLoader, boolean allowNonRestoredState) throws IOException {
        Savepoint rawCheckpointMetadata;
        Preconditions.checkNotNull((Object)jobId, (String)"jobId");
        Preconditions.checkNotNull(tasks, (String)"tasks");
        Preconditions.checkNotNull((Object)location, (String)"location");
        Preconditions.checkNotNull((Object)classLoader, (String)"classLoader");
        StreamStateHandle metadataHandle = location.getMetadataHandle();
        String checkpointPointer = location.getExternalPointer();
        try (FSDataInputStream in = metadataHandle.openInputStream();){
            DataInputStream dis = new DataInputStream((InputStream)in);
            rawCheckpointMetadata = Checkpoints.loadCheckpointMetadata(dis, classLoader);
        }
        Savepoint checkpointMetadata = rawCheckpointMetadata.getTaskStates() == null ? rawCheckpointMetadata : SavepointV2.convertToOperatorStateSavepointV2(tasks, rawCheckpointMetadata);
        Map<OperatorID, ExecutionJobVertex> operatorToJobVertexMapping = new HashMap<OperatorID, ExecutionJobVertex>();
        for (ExecutionJobVertex task : tasks.values()) {
            for (OperatorID operatorID : task.getOperatorIDs()) {
                operatorToJobVertexMapping.put(operatorID, task);
            }
        }
        boolean expandedToLegacyIds = false;
        HashMap<OperatorID, OperatorState> operatorStates = new HashMap<OperatorID, OperatorState>(checkpointMetadata.getOperatorStates().size());
        for (OperatorState operatorState : checkpointMetadata.getOperatorStates()) {
            ExecutionJobVertex executionJobVertex = (ExecutionJobVertex)operatorToJobVertexMapping.get((Object)operatorState.getOperatorID());
            if (executionJobVertex == null && !expandedToLegacyIds) {
                operatorToJobVertexMapping = ExecutionJobVertex.includeAlternativeOperatorIDs(operatorToJobVertexMapping);
                executionJobVertex = operatorToJobVertexMapping.get((Object)operatorState.getOperatorID());
                expandedToLegacyIds = true;
                LOG.info("Could not find ExecutionJobVertex. Including user-defined OperatorIDs in search.");
            }
            if (executionJobVertex != null) {
                if (executionJobVertex.getMaxParallelism() == operatorState.getMaxParallelism() || !executionJobVertex.isMaxParallelismConfigured()) {
                    operatorStates.put(operatorState.getOperatorID(), operatorState);
                    continue;
                }
                String msg = String.format("Failed to rollback to checkpoint/savepoint %s. Max parallelism mismatch between checkpoint/savepoint state and new program. Cannot map operator %s with max parallelism %d to new program with max parallelism %d. This indicates that the program has been changed in a non-compatible way after the checkpoint/savepoint.", new Object[]{checkpointMetadata, operatorState.getOperatorID(), operatorState.getMaxParallelism(), executionJobVertex.getMaxParallelism()});
                throw new IllegalStateException(msg);
            }
            if (allowNonRestoredState) {
                LOG.info("Skipping savepoint state for operator {}.", (Object)operatorState.getOperatorID());
                continue;
            }
            for (OperatorSubtaskState operatorSubtaskState : operatorState.getStates()) {
                if (!operatorSubtaskState.hasState()) continue;
                String msg = String.format("Failed to rollback to checkpoint/savepoint %s. Cannot map checkpoint/savepoint state for operator %s to the new program, because the operator is not available in the new program. If you want to allow to skip this, you can set the --allowNonRestoredState option on the CLI.", new Object[]{checkpointPointer, operatorState.getOperatorID()});
                throw new IllegalStateException(msg);
            }
            LOG.info("Skipping empty savepoint state for operator {}.", (Object)operatorState.getOperatorID());
        }
        CheckpointProperties props = CheckpointProperties.forSavepoint();
        return new CompletedCheckpoint(jobId, checkpointMetadata.getCheckpointId(), 0L, 0L, operatorStates, checkpointMetadata.getMasterStates(), props, location);
    }

    public static void disposeSavepoint(String pointer, StateBackend stateBackend, ClassLoader classLoader) throws IOException, FlinkException {
        Savepoint savepoint;
        Preconditions.checkNotNull((Object)pointer, (String)"location");
        Preconditions.checkNotNull((Object)stateBackend, (String)"stateBackend");
        Preconditions.checkNotNull((Object)classLoader, (String)"classLoader");
        CompletedCheckpointStorageLocation checkpointLocation = stateBackend.resolveCheckpoint(pointer);
        StreamStateHandle metadataHandle = checkpointLocation.getMetadataHandle();
        try (FSDataInputStream in = metadataHandle.openInputStream();
             DataInputStream dis = new DataInputStream((InputStream)in);){
            savepoint = Checkpoints.loadCheckpointMetadata(dis, classLoader);
        }
        Exception exception = null;
        try {
            metadataHandle.discardState();
        }
        catch (Exception e) {
            exception = e;
        }
        try {
            savepoint.dispose();
        }
        catch (Exception e) {
            exception = (Exception)ExceptionUtils.firstOrSuppressed((Throwable)e, (Throwable)exception);
        }
        try {
            checkpointLocation.disposeStorageLocation();
        }
        catch (Exception e) {
            exception = (Exception)ExceptionUtils.firstOrSuppressed((Throwable)e, (Throwable)exception);
        }
        if (exception != null) {
            ExceptionUtils.rethrowIOException((Throwable)exception);
        }
    }

    public static void disposeSavepoint(String pointer, Configuration configuration, ClassLoader classLoader, @Nullable Logger logger) throws IOException, FlinkException {
        Preconditions.checkNotNull((Object)pointer, (String)"location");
        Preconditions.checkNotNull((Object)configuration, (String)"configuration");
        Preconditions.checkNotNull((Object)classLoader, (String)"classLoader");
        StateBackend backend = Checkpoints.loadStateBackend(configuration, classLoader, logger);
        Checkpoints.disposeSavepoint(pointer, backend, classLoader);
    }

    @Nonnull
    public static StateBackend loadStateBackend(Configuration configuration, ClassLoader classLoader, @Nullable Logger logger) {
        StateBackend backend;
        block5: {
            if (logger != null) {
                logger.info("Attempting to load configured state backend for savepoint disposal");
            }
            backend = null;
            try {
                backend = StateBackendLoader.loadStateBackendFromConfig(configuration, classLoader, null);
                if (backend == null && logger != null) {
                    logger.info("No state backend configured, attempting to dispose savepoint with default backend (file system based)");
                }
            }
            catch (Throwable t) {
                if (logger == null) break block5;
                logger.info("Could not load configured state backend.");
                logger.debug("Detailed exception:", t);
            }
        }
        if (backend == null) {
            backend = new MemoryStateBackend();
        }
        return backend;
    }

    private Checkpoints() {
    }
}

