/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.scheduler;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledExecutorService;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.configuration.CheckpointingOptions;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.io.InputSplit;
import org.apache.flink.queryablestate.KvStateID;
import org.apache.flink.runtime.JobException;
import org.apache.flink.runtime.accumulators.AccumulatorSnapshot;
import org.apache.flink.runtime.blob.BlobWriter;
import org.apache.flink.runtime.checkpoint.CheckpointCoordinator;
import org.apache.flink.runtime.checkpoint.CheckpointMetrics;
import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory;
import org.apache.flink.runtime.checkpoint.CompletedCheckpoint;
import org.apache.flink.runtime.checkpoint.TaskStateSnapshot;
import org.apache.flink.runtime.client.JobExecutionException;
import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor;
import org.apache.flink.runtime.concurrent.FutureUtils;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
import org.apache.flink.runtime.executiongraph.Execution;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
import org.apache.flink.runtime.executiongraph.ExecutionGraph;
import org.apache.flink.runtime.executiongraph.ExecutionGraphBuilder;
import org.apache.flink.runtime.executiongraph.ExecutionGraphException;
import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
import org.apache.flink.runtime.executiongraph.IntermediateResult;
import org.apache.flink.runtime.executiongraph.JobStatusListener;
import org.apache.flink.runtime.executiongraph.restart.RestartStrategy;
import org.apache.flink.runtime.executiongraph.restart.RestartStrategyFactory;
import org.apache.flink.runtime.executiongraph.restart.RestartStrategyResolving;
import org.apache.flink.runtime.io.network.partition.PartitionTracker;
import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobStatus;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
import org.apache.flink.runtime.jobmanager.PartitionProducerDisposedException;
import org.apache.flink.runtime.jobmaster.SerializedInputSplit;
import org.apache.flink.runtime.jobmaster.slotpool.SlotProvider;
import org.apache.flink.runtime.messages.FlinkJobNotFoundException;
import org.apache.flink.runtime.messages.checkpoint.AcknowledgeCheckpoint;
import org.apache.flink.runtime.messages.checkpoint.DeclineCheckpoint;
import org.apache.flink.runtime.messages.webmonitor.JobDetails;
import org.apache.flink.runtime.metrics.groups.JobManagerJobMetricGroup;
import org.apache.flink.runtime.query.KvStateLocation;
import org.apache.flink.runtime.query.KvStateLocationRegistry;
import org.apache.flink.runtime.query.UnknownKvStateLocation;
import org.apache.flink.runtime.rest.handler.legacy.backpressure.BackPressureStatsTracker;
import org.apache.flink.runtime.rest.handler.legacy.backpressure.OperatorBackPressureStats;
import org.apache.flink.runtime.scheduler.SchedulerNG;
import org.apache.flink.runtime.shuffle.ShuffleMaster;
import org.apache.flink.runtime.state.KeyGroupRange;
import org.apache.flink.runtime.taskmanager.TaskExecutionState;
import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
import org.apache.flink.runtime.webmonitor.WebMonitorUtils;
import org.apache.flink.util.FlinkException;
import org.apache.flink.util.InstantiationUtil;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.function.FunctionUtils;
import org.slf4j.Logger;

public class LegacyScheduler
implements SchedulerNG {
    private final Logger log;
    private final JobGraph jobGraph;
    private final ExecutionGraph executionGraph;
    private final BackPressureStatsTracker backPressureStatsTracker;
    private final Executor ioExecutor;
    private final Configuration jobMasterConfiguration;
    private final SlotProvider slotProvider;
    private final ScheduledExecutorService futureExecutor;
    private final ClassLoader userCodeLoader;
    private final CheckpointRecoveryFactory checkpointRecoveryFactory;
    private final Time rpcTimeout;
    private final RestartStrategy restartStrategy;
    private final BlobWriter blobWriter;
    private final Time slotRequestTimeout;
    private ComponentMainThreadExecutor mainThreadExecutor = new ComponentMainThreadExecutor.DummyComponentMainThreadExecutor("LegacyScheduler is not initialized with proper main thread executor. Call to LegacyScheduler.setMainThreadExecutor(...) required.");

    public LegacyScheduler(Logger log, JobGraph jobGraph, BackPressureStatsTracker backPressureStatsTracker, Executor ioExecutor, Configuration jobMasterConfiguration, SlotProvider slotProvider, ScheduledExecutorService futureExecutor, ClassLoader userCodeLoader, CheckpointRecoveryFactory checkpointRecoveryFactory, Time rpcTimeout, RestartStrategyFactory restartStrategyFactory, BlobWriter blobWriter, JobManagerJobMetricGroup jobManagerJobMetricGroup, Time slotRequestTimeout, ShuffleMaster<?> shuffleMaster, PartitionTracker partitionTracker) throws Exception {
        this.log = (Logger)Preconditions.checkNotNull((Object)log);
        this.jobGraph = (JobGraph)Preconditions.checkNotNull((Object)jobGraph);
        this.backPressureStatsTracker = (BackPressureStatsTracker)Preconditions.checkNotNull((Object)backPressureStatsTracker);
        this.ioExecutor = (Executor)Preconditions.checkNotNull((Object)ioExecutor);
        this.jobMasterConfiguration = (Configuration)Preconditions.checkNotNull((Object)jobMasterConfiguration);
        this.slotProvider = (SlotProvider)Preconditions.checkNotNull((Object)slotProvider);
        this.futureExecutor = (ScheduledExecutorService)Preconditions.checkNotNull((Object)futureExecutor);
        this.userCodeLoader = (ClassLoader)Preconditions.checkNotNull((Object)userCodeLoader);
        this.checkpointRecoveryFactory = (CheckpointRecoveryFactory)Preconditions.checkNotNull((Object)checkpointRecoveryFactory);
        this.rpcTimeout = (Time)Preconditions.checkNotNull((Object)rpcTimeout);
        RestartStrategies.RestartStrategyConfiguration restartStrategyConfiguration = ((ExecutionConfig)jobGraph.getSerializedExecutionConfig().deserializeValue(userCodeLoader)).getRestartStrategy();
        this.restartStrategy = RestartStrategyResolving.resolve(restartStrategyConfiguration, restartStrategyFactory, jobGraph.isCheckpointingEnabled());
        log.info("Using restart strategy {} for {} ({}).", new Object[]{this.restartStrategy, jobGraph.getName(), jobGraph.getJobID()});
        this.blobWriter = (BlobWriter)Preconditions.checkNotNull((Object)blobWriter);
        this.slotRequestTimeout = (Time)Preconditions.checkNotNull((Object)slotRequestTimeout);
        this.executionGraph = this.createAndRestoreExecutionGraph(jobManagerJobMetricGroup, (ShuffleMaster)Preconditions.checkNotNull(shuffleMaster), (PartitionTracker)Preconditions.checkNotNull((Object)partitionTracker));
    }

    private ExecutionGraph createAndRestoreExecutionGraph(JobManagerJobMetricGroup currentJobManagerJobMetricGroup, ShuffleMaster<?> shuffleMaster, PartitionTracker partitionTracker) throws Exception {
        ExecutionGraph newExecutionGraph = this.createExecutionGraph(currentJobManagerJobMetricGroup, shuffleMaster, partitionTracker);
        CheckpointCoordinator checkpointCoordinator = newExecutionGraph.getCheckpointCoordinator();
        if (checkpointCoordinator != null && !checkpointCoordinator.restoreLatestCheckpointedState(newExecutionGraph.getAllVertices(), false, false)) {
            this.tryRestoreExecutionGraphFromSavepoint(newExecutionGraph, this.jobGraph.getSavepointRestoreSettings());
        }
        return newExecutionGraph;
    }

    private ExecutionGraph createExecutionGraph(JobManagerJobMetricGroup currentJobManagerJobMetricGroup, ShuffleMaster<?> shuffleMaster, PartitionTracker partitionTracker) throws JobExecutionException, JobException {
        return ExecutionGraphBuilder.buildGraph(null, this.jobGraph, this.jobMasterConfiguration, this.futureExecutor, this.ioExecutor, this.slotProvider, this.userCodeLoader, this.checkpointRecoveryFactory, this.rpcTimeout, this.restartStrategy, currentJobManagerJobMetricGroup, this.blobWriter, this.slotRequestTimeout, this.log, shuffleMaster, partitionTracker);
    }

    private void tryRestoreExecutionGraphFromSavepoint(ExecutionGraph executionGraphToRestore, SavepointRestoreSettings savepointRestoreSettings) throws Exception {
        CheckpointCoordinator checkpointCoordinator;
        if (savepointRestoreSettings.restoreSavepoint() && (checkpointCoordinator = executionGraphToRestore.getCheckpointCoordinator()) != null) {
            checkpointCoordinator.restoreSavepoint(savepointRestoreSettings.getRestorePath(), savepointRestoreSettings.allowNonRestoredState(), executionGraphToRestore.getAllVertices(), this.userCodeLoader);
        }
    }

    @Override
    public void setMainThreadExecutor(ComponentMainThreadExecutor mainThreadExecutor) {
        this.mainThreadExecutor = (ComponentMainThreadExecutor)Preconditions.checkNotNull((Object)mainThreadExecutor);
        this.executionGraph.start(mainThreadExecutor);
    }

    @Override
    public void registerJobStatusListener(JobStatusListener jobStatusListener) {
        this.executionGraph.registerJobStatusListener(jobStatusListener);
    }

    @Override
    public void startScheduling() {
        this.mainThreadExecutor.assertRunningInMainThread();
        try {
            this.executionGraph.scheduleForExecution();
        }
        catch (Throwable t) {
            this.executionGraph.failGlobal(t);
        }
    }

    @Override
    public void suspend(Throwable cause) {
        this.mainThreadExecutor.assertRunningInMainThread();
        this.executionGraph.suspend(cause);
    }

    @Override
    public void cancel() {
        this.mainThreadExecutor.assertRunningInMainThread();
        this.executionGraph.cancel();
    }

    @Override
    public CompletableFuture<Void> getTerminationFuture() {
        return this.executionGraph.getTerminationFuture().thenApply(FunctionUtils.nullFn());
    }

    @Override
    public boolean updateTaskExecutionState(TaskExecutionState taskExecutionState) {
        this.mainThreadExecutor.assertRunningInMainThread();
        return this.executionGraph.updateState(taskExecutionState);
    }

    @Override
    public SerializedInputSplit requestNextInputSplit(JobVertexID vertexID, ExecutionAttemptID executionAttempt) throws IOException {
        this.mainThreadExecutor.assertRunningInMainThread();
        Execution execution = this.executionGraph.getRegisteredExecutions().get((Object)executionAttempt);
        if (execution == null) {
            if (this.log.isDebugEnabled()) {
                this.log.debug("Can not find Execution for attempt {}.", (Object)executionAttempt);
            }
            throw new IllegalArgumentException("Can not find Execution for attempt " + (Object)((Object)executionAttempt));
        }
        ExecutionJobVertex vertex = this.executionGraph.getJobVertex(vertexID);
        if (vertex == null) {
            throw new IllegalArgumentException("Cannot find execution vertex for vertex ID " + (Object)((Object)vertexID));
        }
        if (vertex.getSplitAssigner() == null) {
            throw new IllegalStateException("No InputSplitAssigner for vertex ID " + (Object)((Object)vertexID));
        }
        InputSplit nextInputSplit = execution.getNextInputSplit();
        if (this.log.isDebugEnabled()) {
            this.log.debug("Send next input split {}.", (Object)nextInputSplit);
        }
        try {
            byte[] serializedInputSplit = InstantiationUtil.serializeObject((Object)nextInputSplit);
            return new SerializedInputSplit(serializedInputSplit);
        }
        catch (Exception ex) {
            IOException reason = new IOException("Could not serialize the next input split of class " + nextInputSplit.getClass() + ".", ex);
            vertex.fail(reason);
            throw reason;
        }
    }

    @Override
    public ExecutionState requestPartitionState(IntermediateDataSetID intermediateResultId, ResultPartitionID resultPartitionId) throws PartitionProducerDisposedException {
        this.mainThreadExecutor.assertRunningInMainThread();
        Execution execution = this.executionGraph.getRegisteredExecutions().get((Object)resultPartitionId.getProducerId());
        if (execution != null) {
            return execution.getState();
        }
        IntermediateResult intermediateResult = this.executionGraph.getAllIntermediateResults().get((Object)intermediateResultId);
        if (intermediateResult != null) {
            Execution producerExecution = intermediateResult.getPartitionById(resultPartitionId.getPartitionId()).getProducer().getCurrentExecutionAttempt();
            if (producerExecution.getAttemptId().equals((Object)resultPartitionId.getProducerId())) {
                return producerExecution.getState();
            }
            throw new PartitionProducerDisposedException(resultPartitionId);
        }
        throw new IllegalArgumentException("Intermediate data set with ID " + (Object)((Object)intermediateResultId) + " not found.");
    }

    @Override
    public void scheduleOrUpdateConsumers(ResultPartitionID partitionID) {
        this.mainThreadExecutor.assertRunningInMainThread();
        try {
            this.executionGraph.scheduleOrUpdateConsumers(partitionID);
        }
        catch (ExecutionGraphException e) {
            throw new RuntimeException(e);
        }
    }

    @Override
    public ArchivedExecutionGraph requestJob() {
        this.mainThreadExecutor.assertRunningInMainThread();
        return ArchivedExecutionGraph.createFrom(this.executionGraph);
    }

    @Override
    public JobStatus requestJobStatus() {
        return this.executionGraph.getState();
    }

    @Override
    public JobDetails requestJobDetails() {
        this.mainThreadExecutor.assertRunningInMainThread();
        return WebMonitorUtils.createDetailsForJob(this.executionGraph);
    }

    @Override
    public KvStateLocation requestKvStateLocation(JobID jobId, String registrationName) throws UnknownKvStateLocation, FlinkJobNotFoundException {
        this.mainThreadExecutor.assertRunningInMainThread();
        if (this.jobGraph.getJobID().equals((Object)jobId)) {
            KvStateLocationRegistry registry;
            KvStateLocation location;
            if (this.log.isDebugEnabled()) {
                this.log.debug("Lookup key-value state for job {} with registration name {}.", (Object)this.jobGraph.getJobID(), (Object)registrationName);
            }
            if ((location = (registry = this.executionGraph.getKvStateLocationRegistry()).getKvStateLocation(registrationName)) != null) {
                return location;
            }
            throw new UnknownKvStateLocation(registrationName);
        }
        if (this.log.isDebugEnabled()) {
            this.log.debug("Request of key-value state location for unknown job {} received.", (Object)jobId);
        }
        throw new FlinkJobNotFoundException(jobId);
    }

    @Override
    public void notifyKvStateRegistered(JobID jobId, JobVertexID jobVertexId, KeyGroupRange keyGroupRange, String registrationName, KvStateID kvStateId, InetSocketAddress kvStateServerAddress) throws FlinkJobNotFoundException {
        this.mainThreadExecutor.assertRunningInMainThread();
        if (this.jobGraph.getJobID().equals((Object)jobId)) {
            if (this.log.isDebugEnabled()) {
                this.log.debug("Key value state registered for job {} under name {}.", (Object)this.jobGraph.getJobID(), (Object)registrationName);
            }
            try {
                this.executionGraph.getKvStateLocationRegistry().notifyKvStateRegistered(jobVertexId, keyGroupRange, registrationName, kvStateId, kvStateServerAddress);
            }
            catch (Exception e) {
                throw new RuntimeException(e);
            }
        } else {
            throw new FlinkJobNotFoundException(jobId);
        }
    }

    @Override
    public void notifyKvStateUnregistered(JobID jobId, JobVertexID jobVertexId, KeyGroupRange keyGroupRange, String registrationName) throws FlinkJobNotFoundException {
        this.mainThreadExecutor.assertRunningInMainThread();
        if (this.jobGraph.getJobID().equals((Object)jobId)) {
            if (this.log.isDebugEnabled()) {
                this.log.debug("Key value state unregistered for job {} under name {}.", (Object)this.jobGraph.getJobID(), (Object)registrationName);
            }
            try {
                this.executionGraph.getKvStateLocationRegistry().notifyKvStateUnregistered(jobVertexId, keyGroupRange, registrationName);
            }
            catch (Exception e) {
                throw new RuntimeException(e);
            }
        } else {
            throw new FlinkJobNotFoundException(jobId);
        }
    }

    @Override
    public void updateAccumulators(AccumulatorSnapshot accumulatorSnapshot) {
        this.mainThreadExecutor.assertRunningInMainThread();
        this.executionGraph.updateAccumulators(accumulatorSnapshot);
    }

    @Override
    public Optional<OperatorBackPressureStats> requestOperatorBackPressureStats(JobVertexID jobVertexId) throws FlinkException {
        ExecutionJobVertex jobVertex = this.executionGraph.getJobVertex(jobVertexId);
        if (jobVertex == null) {
            throw new FlinkException("JobVertexID not found " + (Object)((Object)jobVertexId));
        }
        return this.backPressureStatsTracker.getOperatorBackPressureStats(jobVertex);
    }

    @Override
    public CompletableFuture<String> triggerSavepoint(String targetDirectory, boolean cancelJob) {
        this.mainThreadExecutor.assertRunningInMainThread();
        CheckpointCoordinator checkpointCoordinator = this.executionGraph.getCheckpointCoordinator();
        if (checkpointCoordinator == null) {
            throw new IllegalStateException(String.format("Job %s is not a streaming job.", this.jobGraph.getJobID()));
        }
        if (targetDirectory == null && !checkpointCoordinator.getCheckpointStorage().hasDefaultSavepointLocation()) {
            this.log.info("Trying to cancel job {} with savepoint, but no savepoint directory configured.", (Object)this.jobGraph.getJobID());
            throw new IllegalStateException("No savepoint directory configured. You can either specify a directory while cancelling via -s :targetDirectory or configure a cluster-wide default via key '" + CheckpointingOptions.SAVEPOINT_DIRECTORY.key() + "'.");
        }
        if (cancelJob) {
            checkpointCoordinator.stopCheckpointScheduler();
        }
        return ((CompletableFuture)checkpointCoordinator.triggerSavepoint(System.currentTimeMillis(), targetDirectory).thenApply(CompletedCheckpoint::getExternalPointer)).handleAsync((path, throwable) -> {
            if (throwable != null) {
                if (cancelJob) {
                    this.startCheckpointScheduler(checkpointCoordinator);
                }
                throw new CompletionException((Throwable)throwable);
            }
            if (cancelJob) {
                this.log.info("Savepoint stored in {}. Now cancelling {}.", path, (Object)this.jobGraph.getJobID());
                this.cancel();
            }
            return path;
        }, (Executor)this.mainThreadExecutor);
    }

    private void startCheckpointScheduler(CheckpointCoordinator checkpointCoordinator) {
        this.mainThreadExecutor.assertRunningInMainThread();
        if (checkpointCoordinator.isPeriodicCheckpointingConfigured()) {
            try {
                checkpointCoordinator.startCheckpointScheduler();
            }
            catch (IllegalStateException illegalStateException) {
                // empty catch block
            }
        }
    }

    @Override
    public void acknowledgeCheckpoint(JobID jobID, ExecutionAttemptID executionAttemptID, long checkpointId, CheckpointMetrics checkpointMetrics, TaskStateSnapshot checkpointState) {
        this.mainThreadExecutor.assertRunningInMainThread();
        CheckpointCoordinator checkpointCoordinator = this.executionGraph.getCheckpointCoordinator();
        AcknowledgeCheckpoint ackMessage = new AcknowledgeCheckpoint(jobID, executionAttemptID, checkpointId, checkpointMetrics, checkpointState);
        String taskManagerLocationInfo = this.retrieveTaskManagerLocation(executionAttemptID);
        if (checkpointCoordinator != null) {
            this.ioExecutor.execute(() -> {
                try {
                    checkpointCoordinator.receiveAcknowledgeMessage(ackMessage, taskManagerLocationInfo);
                }
                catch (Throwable t) {
                    this.log.warn("Error while processing checkpoint acknowledgement message", t);
                }
            });
        } else {
            String errorMessage = "Received AcknowledgeCheckpoint message for job {} with no CheckpointCoordinator";
            if (this.executionGraph.getState() == JobStatus.RUNNING) {
                this.log.error(errorMessage, (Object)this.jobGraph.getJobID());
            } else {
                this.log.debug(errorMessage, (Object)this.jobGraph.getJobID());
            }
        }
    }

    @Override
    public void declineCheckpoint(DeclineCheckpoint decline) {
        this.mainThreadExecutor.assertRunningInMainThread();
        CheckpointCoordinator checkpointCoordinator = this.executionGraph.getCheckpointCoordinator();
        String taskManagerLocationInfo = this.retrieveTaskManagerLocation(decline.getTaskExecutionId());
        if (checkpointCoordinator != null) {
            this.ioExecutor.execute(() -> {
                try {
                    checkpointCoordinator.receiveDeclineMessage(decline, taskManagerLocationInfo);
                }
                catch (Exception e) {
                    this.log.error("Error in CheckpointCoordinator while processing {}", (Object)decline, (Object)e);
                }
            });
        } else {
            String errorMessage = "Received DeclineCheckpoint message for job {} with no CheckpointCoordinator";
            if (this.executionGraph.getState() == JobStatus.RUNNING) {
                this.log.error(errorMessage, (Object)this.jobGraph.getJobID());
            } else {
                this.log.debug(errorMessage, (Object)this.jobGraph.getJobID());
            }
        }
    }

    @Override
    public CompletableFuture<String> stopWithSavepoint(String targetDirectory, boolean advanceToEndOfEventTime) {
        this.mainThreadExecutor.assertRunningInMainThread();
        CheckpointCoordinator checkpointCoordinator = this.executionGraph.getCheckpointCoordinator();
        if (checkpointCoordinator == null) {
            return FutureUtils.completedExceptionally(new IllegalStateException(String.format("Job %s is not a streaming job.", this.jobGraph.getJobID())));
        }
        if (targetDirectory == null && !checkpointCoordinator.getCheckpointStorage().hasDefaultSavepointLocation()) {
            this.log.info("Trying to cancel job {} with savepoint, but no savepoint directory configured.", (Object)this.jobGraph.getJobID());
            return FutureUtils.completedExceptionally(new IllegalStateException("No savepoint directory configured. You can either specify a directory while cancelling via -s :targetDirectory or configure a cluster-wide default via key '" + CheckpointingOptions.SAVEPOINT_DIRECTORY.key() + "'."));
        }
        checkpointCoordinator.stopCheckpointScheduler();
        long now = System.currentTimeMillis();
        CompletionStage savepointFuture = checkpointCoordinator.triggerSynchronousSavepoint(now, advanceToEndOfEventTime, targetDirectory).thenApply(CompletedCheckpoint::getExternalPointer);
        CompletionStage terminationFuture = this.executionGraph.getTerminationFuture().handle((jobstatus, throwable) -> {
            if (throwable != null) {
                this.log.info("Failed during stopping job {} with a savepoint. Reason: {}", (Object)this.jobGraph.getJobID(), (Object)throwable.getMessage());
                throw new CompletionException((Throwable)throwable);
            }
            if (jobstatus != JobStatus.FINISHED) {
                this.log.info("Failed during stopping job {} with a savepoint. Reason: Reached state {} instead of FINISHED.", (Object)this.jobGraph.getJobID(), (Object)jobstatus);
                throw new CompletionException(new FlinkException("Reached state " + (Object)jobstatus + " instead of FINISHED."));
            }
            return jobstatus;
        });
        return ((CompletableFuture)savepointFuture).thenCompose(arg_0 -> LegacyScheduler.lambda$stopWithSavepoint$5((CompletableFuture)terminationFuture, arg_0));
    }

    private String retrieveTaskManagerLocation(ExecutionAttemptID executionAttemptID) {
        Optional<Execution> currentExecution = Optional.ofNullable(this.executionGraph.getRegisteredExecutions().get((Object)executionAttemptID));
        return currentExecution.map(Execution::getAssignedResourceLocation).map(TaskManagerLocation::toString).orElse("Unknown location");
    }

    private static /* synthetic */ CompletionStage lambda$stopWithSavepoint$5(CompletableFuture terminationFuture, String path) {
        return terminationFuture.thenApply(jobStatus -> path);
    }
}

