/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.runtime.tasks;

import java.io.Closeable;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.atomic.AtomicReference;
import javax.annotation.Nullable;
import org.apache.flink.annotation.Internal;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.accumulators.Accumulator;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.TaskManagerOptions;
import org.apache.flink.core.fs.CloseableRegistry;
import org.apache.flink.core.fs.FileSystemSafetyNet;
import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
import org.apache.flink.runtime.checkpoint.CheckpointMetrics;
import org.apache.flink.runtime.checkpoint.CheckpointOptions;
import org.apache.flink.runtime.checkpoint.TaskStateSnapshot;
import org.apache.flink.runtime.event.AbstractEvent;
import org.apache.flink.runtime.execution.CancelTaskException;
import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.runtime.io.network.api.CancelCheckpointMarker;
import org.apache.flink.runtime.io.network.api.writer.RecordWriter;
import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
import org.apache.flink.runtime.plugable.SerializationDelegate;
import org.apache.flink.runtime.state.CheckpointStorage;
import org.apache.flink.runtime.state.CheckpointStreamFactory;
import org.apache.flink.runtime.state.StateBackend;
import org.apache.flink.runtime.state.StateBackendLoader;
import org.apache.flink.runtime.state.TaskStateManager;
import org.apache.flink.runtime.taskmanager.DispatcherThreadFactory;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.graph.StreamConfig;
import org.apache.flink.streaming.api.graph.StreamEdge;
import org.apache.flink.streaming.api.operators.OperatorSnapshotFinalizer;
import org.apache.flink.streaming.api.operators.OperatorSnapshotFutures;
import org.apache.flink.streaming.api.operators.StreamOperator;
import org.apache.flink.streaming.api.operators.StreamTaskStateInitializer;
import org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl;
import org.apache.flink.streaming.runtime.io.RecordWriterOutput;
import org.apache.flink.streaming.runtime.partitioner.ConfigurableStreamPartitioner;
import org.apache.flink.streaming.runtime.partitioner.StreamPartitioner;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.streamstatus.StreamStatusMaintainer;
import org.apache.flink.streaming.runtime.tasks.AsyncExceptionHandler;
import org.apache.flink.streaming.runtime.tasks.AsynchronousException;
import org.apache.flink.streaming.runtime.tasks.CheckpointExceptionHandler;
import org.apache.flink.streaming.runtime.tasks.CheckpointExceptionHandlerFactory;
import org.apache.flink.streaming.runtime.tasks.OperatorChain;
import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
import org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Internal
public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>>
extends AbstractInvokable
implements AsyncExceptionHandler {
    public static final ThreadGroup TRIGGER_THREAD_GROUP = new ThreadGroup("Triggers");
    private static final Logger LOG = LoggerFactory.getLogger(StreamTask.class);
    private final Object lock = new Object();
    protected OP headOperator;
    protected OperatorChain<OUT, OP> operatorChain;
    protected final StreamConfig configuration;
    protected StateBackend stateBackend;
    private CheckpointStorage checkpointStorage;
    protected ProcessingTimeService timerService;
    private final Map<String, Accumulator<?, ?>> accumulatorMap;
    private final CloseableRegistry cancelables = new CloseableRegistry();
    private volatile boolean isRunning;
    private volatile boolean canceled;
    private ExecutorService asyncOperationsThreadPool;
    private CheckpointExceptionHandler synchronousCheckpointExceptionHandler;
    private AsyncCheckpointExceptionHandler asynchronousCheckpointExceptionHandler;
    private final List<RecordWriter<SerializationDelegate<StreamRecord<OUT>>>> recordWriters;

    protected StreamTask(Environment env) {
        this(env, null);
    }

    protected StreamTask(Environment environment, @Nullable ProcessingTimeService timeProvider) {
        super(environment);
        this.timerService = timeProvider;
        this.configuration = new StreamConfig(this.getTaskConfiguration());
        this.accumulatorMap = this.getEnvironment().getAccumulatorRegistry().getUserMap();
        this.recordWriters = StreamTask.createRecordWriters(this.configuration, environment);
    }

    protected abstract void init() throws Exception;

    protected abstract void run() throws Exception;

    protected abstract void cleanup() throws Exception;

    protected abstract void cancelTask() throws Exception;

    public StreamTaskStateInitializer createStreamTaskStateInitializer() {
        return new StreamTaskStateInitializerImpl(this.getEnvironment(), this.stateBackend, this.timerService);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public final void invoke() throws Exception {
        boolean disposed = false;
        try {
            LOG.debug("Initializing {}.", (Object)this.getName());
            this.asyncOperationsThreadPool = Executors.newCachedThreadPool();
            CheckpointExceptionHandlerFactory cpExceptionHandlerFactory = this.createCheckpointExceptionHandlerFactory();
            this.synchronousCheckpointExceptionHandler = cpExceptionHandlerFactory.createCheckpointExceptionHandler(this.getExecutionConfig().isFailTaskOnCheckpointError(), this.getEnvironment());
            this.asynchronousCheckpointExceptionHandler = new AsyncCheckpointExceptionHandler(this);
            this.stateBackend = this.createStateBackend();
            this.checkpointStorage = this.stateBackend.createCheckpointStorage(this.getEnvironment().getJobID());
            if (this.timerService == null) {
                DispatcherThreadFactory timerThreadFactory = new DispatcherThreadFactory(TRIGGER_THREAD_GROUP, "Time Trigger for " + this.getName(), this.getUserCodeClassLoader());
                this.timerService = new SystemProcessingTimeService(this, this.getCheckpointLock(), (ThreadFactory)timerThreadFactory);
            }
            this.operatorChain = new OperatorChain(this, this.recordWriters);
            this.headOperator = this.operatorChain.getHeadOperator();
            this.init();
            if (this.canceled) {
                throw new CancelTaskException();
            }
            LOG.debug("Invoking {}", (Object)this.getName());
            Object object = this.lock;
            synchronized (object) {
                this.initializeState();
                this.openAllOperators();
            }
            if (this.canceled) {
                throw new CancelTaskException();
            }
            this.isRunning = true;
            this.run();
            if (this.canceled) {
                throw new CancelTaskException();
            }
            LOG.debug("Finished task {}", (Object)this.getName());
            object = this.lock;
            synchronized (object) {
                this.closeAllOperators();
                this.timerService.quiesce();
                this.isRunning = false;
            }
            this.timerService.awaitPendingAfterQuiesce();
            LOG.debug("Closed operators for task {}", (Object)this.getName());
            this.operatorChain.flushOutputs();
            this.tryDisposeAllOperators();
            disposed = true;
        }
        finally {
            this.isRunning = false;
            this.setShouldInterruptOnCancel(false);
            Thread.interrupted();
            this.tryShutdownTimerService();
            try {
                this.cancelables.close();
                this.shutdownAsyncThreads();
            }
            catch (Throwable t) {
                LOG.error("Could not shut down async checkpoint threads", t);
            }
            try {
                this.cleanup();
            }
            catch (Throwable t) {
                LOG.error("Error during cleanup of stream task", t);
            }
            if (!disposed) {
                this.disposeAllOperators();
            }
            if (this.operatorChain != null) {
                Object object = this.lock;
                synchronized (object) {
                    this.operatorChain.releaseOutputs();
                }
            }
        }
    }

    public final void cancel() throws Exception {
        this.isRunning = false;
        this.canceled = true;
        try {
            this.cancelTask();
        }
        finally {
            this.cancelables.close();
        }
    }

    public final boolean isRunning() {
        return this.isRunning;
    }

    public final boolean isCanceled() {
        return this.canceled;
    }

    private void openAllOperators() throws Exception {
        for (StreamOperator<?> operator : this.operatorChain.getAllOperators()) {
            if (operator == null) continue;
            operator.open();
        }
    }

    private void closeAllOperators() throws Exception {
        StreamOperator<?>[] allOperators = this.operatorChain.getAllOperators();
        for (int i = allOperators.length - 1; i >= 0; --i) {
            StreamOperator<?> operator = allOperators[i];
            if (operator == null) continue;
            operator.close();
        }
    }

    private void tryDisposeAllOperators() throws Exception {
        for (StreamOperator<?> operator : this.operatorChain.getAllOperators()) {
            if (operator == null) continue;
            operator.dispose();
        }
    }

    private void shutdownAsyncThreads() throws Exception {
        if (!this.asyncOperationsThreadPool.isShutdown()) {
            this.asyncOperationsThreadPool.shutdownNow();
        }
    }

    private void disposeAllOperators() {
        if (this.operatorChain != null) {
            for (StreamOperator<?> operator : this.operatorChain.getAllOperators()) {
                try {
                    if (operator == null) continue;
                    operator.dispose();
                }
                catch (Throwable t) {
                    LOG.error("Error during disposal of stream operator.", t);
                }
            }
        }
    }

    protected void finalize() throws Throwable {
        super.finalize();
        if (this.timerService != null && !this.timerService.isTerminated()) {
            LOG.info("Timer service is shutting down.");
            this.timerService.shutdownService();
        }
        this.cancelables.close();
    }

    boolean isSerializingTimestamps() {
        TimeCharacteristic tc = this.configuration.getTimeCharacteristic();
        return tc == TimeCharacteristic.EventTime | tc == TimeCharacteristic.IngestionTime;
    }

    public String getName() {
        return this.getEnvironment().getTaskInfo().getTaskNameWithSubtasks();
    }

    public Object getCheckpointLock() {
        return this.lock;
    }

    public CheckpointStorage getCheckpointStorage() {
        return this.checkpointStorage;
    }

    public StreamConfig getConfiguration() {
        return this.configuration;
    }

    public Map<String, Accumulator<?, ?>> getAccumulatorMap() {
        return this.accumulatorMap;
    }

    public StreamStatusMaintainer getStreamStatusMaintainer() {
        return this.operatorChain;
    }

    RecordWriterOutput<?>[] getStreamOutputs() {
        return this.operatorChain.getStreamOutputs();
    }

    public boolean triggerCheckpoint(CheckpointMetaData checkpointMetaData, CheckpointOptions checkpointOptions) throws Exception {
        try {
            CheckpointMetrics checkpointMetrics = new CheckpointMetrics().setBytesBufferedInAlignment(0L).setAlignmentDurationNanos(0L);
            return this.performCheckpoint(checkpointMetaData, checkpointOptions, checkpointMetrics);
        }
        catch (Exception e) {
            if (this.isRunning) {
                throw new Exception("Could not perform checkpoint " + checkpointMetaData.getCheckpointId() + " for operator " + this.getName() + '.', e);
            }
            LOG.debug("Could not perform checkpoint {} for operator {} while the invokable was not in state running.", new Object[]{checkpointMetaData.getCheckpointId(), this.getName(), e});
            return false;
        }
    }

    public void triggerCheckpointOnBarrier(CheckpointMetaData checkpointMetaData, CheckpointOptions checkpointOptions, CheckpointMetrics checkpointMetrics) throws Exception {
        try {
            this.performCheckpoint(checkpointMetaData, checkpointOptions, checkpointMetrics);
        }
        catch (CancelTaskException e) {
            LOG.info("Operator {} was cancelled while performing checkpoint {}.", (Object)this.getName(), (Object)checkpointMetaData.getCheckpointId());
            throw e;
        }
        catch (Exception e) {
            throw new Exception("Could not perform checkpoint " + checkpointMetaData.getCheckpointId() + " for operator " + this.getName() + '.', e);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void abortCheckpointOnBarrier(long checkpointId, Throwable cause) throws Exception {
        LOG.debug("Aborting checkpoint via cancel-barrier {} for task {}", (Object)checkpointId, (Object)this.getName());
        this.getEnvironment().declineCheckpoint(checkpointId, cause);
        Object object = this.lock;
        synchronized (object) {
            this.operatorChain.broadcastCheckpointCancelMarker(checkpointId);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private boolean performCheckpoint(CheckpointMetaData checkpointMetaData, CheckpointOptions checkpointOptions, CheckpointMetrics checkpointMetrics) throws Exception {
        LOG.debug("Starting checkpoint ({}) {} on task {}", new Object[]{checkpointMetaData.getCheckpointId(), checkpointOptions.getCheckpointType(), this.getName()});
        Object object = this.lock;
        synchronized (object) {
            if (this.isRunning) {
                this.operatorChain.prepareSnapshotPreBarrier(checkpointMetaData.getCheckpointId());
                this.operatorChain.broadcastCheckpointBarrier(checkpointMetaData.getCheckpointId(), checkpointMetaData.getTimestamp(), checkpointOptions);
                this.checkpointState(checkpointMetaData, checkpointOptions, checkpointMetrics);
                return true;
            }
            CancelCheckpointMarker message = new CancelCheckpointMarker(checkpointMetaData.getCheckpointId());
            Exception exception = null;
            for (RecordWriter<SerializationDelegate<StreamRecord<OUT>>> recordWriter : this.recordWriters) {
                try {
                    recordWriter.broadcastEvent((AbstractEvent)message);
                }
                catch (Exception e) {
                    exception = (Exception)ExceptionUtils.firstOrSuppressed((Throwable)new Exception("Could not send cancel checkpoint marker to downstream tasks.", e), exception);
                }
            }
            if (exception != null) {
                throw exception;
            }
            return false;
        }
    }

    public ExecutorService getAsyncOperationsThreadPool() {
        return this.asyncOperationsThreadPool;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void notifyCheckpointComplete(long checkpointId) throws Exception {
        Object object = this.lock;
        synchronized (object) {
            if (this.isRunning) {
                LOG.debug("Notification of complete checkpoint for task {}", (Object)this.getName());
                for (StreamOperator<?> operator : this.operatorChain.getAllOperators()) {
                    if (operator == null) continue;
                    operator.notifyCheckpointComplete(checkpointId);
                }
            } else {
                LOG.debug("Ignoring notification of complete checkpoint for not-running task {}", (Object)this.getName());
            }
        }
    }

    private void tryShutdownTimerService() {
        if (this.timerService != null && !this.timerService.isTerminated()) {
            try {
                long timeoutMs = this.getEnvironment().getTaskManagerInfo().getConfiguration().getLong(TaskManagerOptions.TASK_CANCELLATION_TIMEOUT_TIMERS);
                if (!this.timerService.shutdownServiceUninterruptible(timeoutMs)) {
                    LOG.warn("Timer service shutdown exceeded time limit of {} ms while waiting for pending timers. Will continue with shutdown procedure.", (Object)timeoutMs);
                }
            }
            catch (Throwable t) {
                LOG.error("Could not shut down timer service", t);
            }
        }
    }

    private void checkpointState(CheckpointMetaData checkpointMetaData, CheckpointOptions checkpointOptions, CheckpointMetrics checkpointMetrics) throws Exception {
        CheckpointStreamFactory storage = this.checkpointStorage.resolveCheckpointStorageLocation(checkpointMetaData.getCheckpointId(), checkpointOptions.getTargetLocation());
        CheckpointingOperation checkpointingOperation = new CheckpointingOperation(this, checkpointMetaData, checkpointOptions, storage, checkpointMetrics);
        checkpointingOperation.executeCheckpointing();
    }

    private void initializeState() throws Exception {
        StreamOperator<?>[] allOperators;
        for (StreamOperator<?> operator : allOperators = this.operatorChain.getAllOperators()) {
            if (null == operator) continue;
            operator.initializeState();
        }
    }

    private StateBackend createStateBackend() throws Exception {
        StateBackend fromApplication = this.configuration.getStateBackend(this.getUserCodeClassLoader());
        return StateBackendLoader.fromApplicationOrConfigOrDefault((StateBackend)fromApplication, (Configuration)this.getEnvironment().getTaskManagerInfo().getConfiguration(), (ClassLoader)this.getUserCodeClassLoader(), (Logger)LOG);
    }

    protected CheckpointExceptionHandlerFactory createCheckpointExceptionHandlerFactory() {
        return new CheckpointExceptionHandlerFactory();
    }

    public ProcessingTimeService getProcessingTimeService() {
        if (this.timerService == null) {
            throw new IllegalStateException("The timer service has not been initialized.");
        }
        return this.timerService;
    }

    @Override
    public void handleAsyncException(String message, Throwable exception) {
        if (this.isRunning) {
            this.getEnvironment().failExternally(exception);
        }
    }

    public String toString() {
        return this.getName();
    }

    public CloseableRegistry getCancelables() {
        return this.cancelables;
    }

    @VisibleForTesting
    public static <OUT> List<RecordWriter<SerializationDelegate<StreamRecord<OUT>>>> createRecordWriters(StreamConfig configuration, Environment environment) {
        ArrayList<RecordWriter<SerializationDelegate<StreamRecord<OUT>>>> recordWriters = new ArrayList<RecordWriter<SerializationDelegate<StreamRecord<OUT>>>>();
        List<StreamEdge> outEdgesInOrder = configuration.getOutEdgesInOrder(environment.getUserClassLoader());
        Map<Integer, StreamConfig> chainedConfigs = configuration.getTransitiveChainedTaskConfigsWithSelf(environment.getUserClassLoader());
        for (int i = 0; i < outEdgesInOrder.size(); ++i) {
            StreamEdge edge = outEdgesInOrder.get(i);
            recordWriters.add(StreamTask.createRecordWriter(edge, i, environment, environment.getTaskInfo().getTaskName(), chainedConfigs.get(edge.getSourceId()).getBufferTimeout()));
        }
        return recordWriters;
    }

    private static <OUT> RecordWriter<SerializationDelegate<StreamRecord<OUT>>> createRecordWriter(StreamEdge edge, int outputIndex, Environment environment, String taskName, long bufferTimeout) {
        int numKeyGroups;
        StreamPartitioner<?> outputPartitioner = edge.getPartitioner();
        LOG.debug("Using partitioner {} for output {} of task {}", new Object[]{outputPartitioner, outputIndex, taskName});
        ResultPartitionWriter bufferWriter = environment.getWriter(outputIndex);
        if (outputPartitioner instanceof ConfigurableStreamPartitioner && 0 < (numKeyGroups = bufferWriter.getNumTargetKeyGroups())) {
            ((ConfigurableStreamPartitioner)((Object)outputPartitioner)).configure(numKeyGroups);
        }
        RecordWriter output = RecordWriter.createRecordWriter((ResultPartitionWriter)bufferWriter, outputPartitioner, (long)bufferTimeout, (String)taskName);
        output.setMetricGroup(environment.getMetricGroup().getIOMetricGroup());
        return output;
    }

    static final class AsyncCheckpointExceptionHandler
    implements CheckpointExceptionHandler {
        final StreamTask<?, ?> owner;
        final CheckpointExceptionHandler synchronousCheckpointExceptionHandler;

        AsyncCheckpointExceptionHandler(StreamTask<?, ?> owner) {
            this.owner = (StreamTask)Preconditions.checkNotNull(owner);
            this.synchronousCheckpointExceptionHandler = (CheckpointExceptionHandler)Preconditions.checkNotNull((Object)((StreamTask)owner).synchronousCheckpointExceptionHandler);
        }

        @Override
        public void tryHandleCheckpointException(CheckpointMetaData checkpointMetaData, Exception exception) {
            try {
                this.synchronousCheckpointExceptionHandler.tryHandleCheckpointException(checkpointMetaData, exception);
            }
            catch (Exception unhandled) {
                AsynchronousException asyncException = new AsynchronousException(unhandled);
                this.owner.handleAsyncException("Failure in asynchronous checkpoint materialization", asyncException);
            }
        }
    }

    private static final class CheckpointingOperation {
        private final StreamTask<?, ?> owner;
        private final CheckpointMetaData checkpointMetaData;
        private final CheckpointOptions checkpointOptions;
        private final CheckpointMetrics checkpointMetrics;
        private final CheckpointStreamFactory storageLocation;
        private final StreamOperator<?>[] allOperators;
        private long startSyncPartNano;
        private long startAsyncPartNano;
        private final Map<OperatorID, OperatorSnapshotFutures> operatorSnapshotsInProgress;

        public CheckpointingOperation(StreamTask<?, ?> owner, CheckpointMetaData checkpointMetaData, CheckpointOptions checkpointOptions, CheckpointStreamFactory checkpointStorageLocation, CheckpointMetrics checkpointMetrics) {
            this.owner = (StreamTask)Preconditions.checkNotNull(owner);
            this.checkpointMetaData = (CheckpointMetaData)Preconditions.checkNotNull((Object)checkpointMetaData);
            this.checkpointOptions = (CheckpointOptions)Preconditions.checkNotNull((Object)checkpointOptions);
            this.checkpointMetrics = (CheckpointMetrics)Preconditions.checkNotNull((Object)checkpointMetrics);
            this.storageLocation = (CheckpointStreamFactory)Preconditions.checkNotNull((Object)checkpointStorageLocation);
            this.allOperators = owner.operatorChain.getAllOperators();
            this.operatorSnapshotsInProgress = new HashMap<OperatorID, OperatorSnapshotFutures>(this.allOperators.length);
        }

        public void executeCheckpointing() throws Exception {
            this.startSyncPartNano = System.nanoTime();
            try {
                for (StreamOperator<?> op : this.allOperators) {
                    this.checkpointStreamOperator(op);
                }
                if (LOG.isDebugEnabled()) {
                    LOG.debug("Finished synchronous checkpoints for checkpoint {} on task {}", (Object)this.checkpointMetaData.getCheckpointId(), (Object)this.owner.getName());
                }
                this.startAsyncPartNano = System.nanoTime();
                this.checkpointMetrics.setSyncDurationMillis((this.startAsyncPartNano - this.startSyncPartNano) / 1000000L);
                AsyncCheckpointRunnable asyncCheckpointRunnable = new AsyncCheckpointRunnable(this.owner, this.operatorSnapshotsInProgress, this.checkpointMetaData, this.checkpointMetrics, this.startAsyncPartNano);
                ((StreamTask)this.owner).cancelables.registerCloseable((Closeable)asyncCheckpointRunnable);
                ((StreamTask)this.owner).asyncOperationsThreadPool.submit(asyncCheckpointRunnable);
                if (LOG.isDebugEnabled()) {
                    LOG.debug("{} - finished synchronous part of checkpoint {}. Alignment duration: {} ms, snapshot duration {} ms", new Object[]{this.owner.getName(), this.checkpointMetaData.getCheckpointId(), this.checkpointMetrics.getAlignmentDurationNanos() / 1000000L, this.checkpointMetrics.getSyncDurationMillis()});
                }
            }
            catch (Exception ex) {
                for (OperatorSnapshotFutures operatorSnapshotResult : this.operatorSnapshotsInProgress.values()) {
                    if (null == operatorSnapshotResult) continue;
                    try {
                        operatorSnapshotResult.cancel();
                    }
                    catch (Exception e) {
                        LOG.warn("Could not properly cancel an operator snapshot result.", (Throwable)e);
                    }
                }
                if (LOG.isDebugEnabled()) {
                    LOG.debug("{} - did NOT finish synchronous part of checkpoint {}. Alignment duration: {} ms, snapshot duration {} ms", new Object[]{this.owner.getName(), this.checkpointMetaData.getCheckpointId(), this.checkpointMetrics.getAlignmentDurationNanos() / 1000000L, this.checkpointMetrics.getSyncDurationMillis()});
                }
                ((StreamTask)this.owner).synchronousCheckpointExceptionHandler.tryHandleCheckpointException(this.checkpointMetaData, ex);
            }
        }

        private void checkpointStreamOperator(StreamOperator<?> op) throws Exception {
            if (null != op) {
                OperatorSnapshotFutures snapshotInProgress = op.snapshotState(this.checkpointMetaData.getCheckpointId(), this.checkpointMetaData.getTimestamp(), this.checkpointOptions, this.storageLocation);
                this.operatorSnapshotsInProgress.put(op.getOperatorID(), snapshotInProgress);
            }
        }

        private static enum AsyncCheckpointState {
            RUNNING,
            DISCARDED,
            COMPLETED;

        }
    }

    @VisibleForTesting
    protected static final class AsyncCheckpointRunnable
    implements Runnable,
    Closeable {
        private final StreamTask<?, ?> owner;
        private final Map<OperatorID, OperatorSnapshotFutures> operatorSnapshotsInProgress;
        private final CheckpointMetaData checkpointMetaData;
        private final CheckpointMetrics checkpointMetrics;
        private final long asyncStartNanos;
        private final AtomicReference<CheckpointingOperation.AsyncCheckpointState> asyncCheckpointState = new AtomicReference<CheckpointingOperation.AsyncCheckpointState>(CheckpointingOperation.AsyncCheckpointState.RUNNING);

        AsyncCheckpointRunnable(StreamTask<?, ?> owner, Map<OperatorID, OperatorSnapshotFutures> operatorSnapshotsInProgress, CheckpointMetaData checkpointMetaData, CheckpointMetrics checkpointMetrics, long asyncStartNanos) {
            this.owner = (StreamTask)Preconditions.checkNotNull(owner);
            this.operatorSnapshotsInProgress = (Map)Preconditions.checkNotNull(operatorSnapshotsInProgress);
            this.checkpointMetaData = (CheckpointMetaData)Preconditions.checkNotNull((Object)checkpointMetaData);
            this.checkpointMetrics = (CheckpointMetrics)Preconditions.checkNotNull((Object)checkpointMetrics);
            this.asyncStartNanos = asyncStartNanos;
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public void run() {
            FileSystemSafetyNet.initializeSafetyNetForThread();
            try {
                TaskStateSnapshot jobManagerTaskOperatorSubtaskStates = new TaskStateSnapshot(this.operatorSnapshotsInProgress.size());
                TaskStateSnapshot localTaskOperatorSubtaskStates = new TaskStateSnapshot(this.operatorSnapshotsInProgress.size());
                for (Map.Entry<OperatorID, OperatorSnapshotFutures> entry : this.operatorSnapshotsInProgress.entrySet()) {
                    OperatorID operatorID = entry.getKey();
                    OperatorSnapshotFutures snapshotInProgress = entry.getValue();
                    OperatorSnapshotFinalizer finalizedSnapshots = new OperatorSnapshotFinalizer(snapshotInProgress);
                    jobManagerTaskOperatorSubtaskStates.putSubtaskStateByOperatorID(operatorID, finalizedSnapshots.getJobManagerOwnedState());
                    localTaskOperatorSubtaskStates.putSubtaskStateByOperatorID(operatorID, finalizedSnapshots.getTaskLocalState());
                }
                long asyncEndNanos = System.nanoTime();
                long asyncDurationMillis = (asyncEndNanos - this.asyncStartNanos) / 1000000L;
                this.checkpointMetrics.setAsyncDurationMillis(asyncDurationMillis);
                if (this.asyncCheckpointState.compareAndSet(CheckpointingOperation.AsyncCheckpointState.RUNNING, CheckpointingOperation.AsyncCheckpointState.COMPLETED)) {
                    this.reportCompletedSnapshotStates(jobManagerTaskOperatorSubtaskStates, localTaskOperatorSubtaskStates, asyncDurationMillis);
                } else {
                    LOG.debug("{} - asynchronous part of checkpoint {} could not be completed because it was closed before.", (Object)this.owner.getName(), (Object)this.checkpointMetaData.getCheckpointId());
                }
            }
            catch (Exception e) {
                this.handleExecutionException(e);
            }
            finally {
                ((StreamTask)this.owner).cancelables.unregisterCloseable((Closeable)this);
                FileSystemSafetyNet.closeSafetyNetAndGuardedResourcesForThread();
            }
        }

        private void reportCompletedSnapshotStates(TaskStateSnapshot acknowledgedTaskStateSnapshot, TaskStateSnapshot localTaskStateSnapshot, long asyncDurationMillis) {
            TaskStateManager taskStateManager = this.owner.getEnvironment().getTaskStateManager();
            boolean hasAckState = acknowledgedTaskStateSnapshot.hasState();
            boolean hasLocalState = localTaskStateSnapshot.hasState();
            Preconditions.checkState((hasAckState || !hasLocalState ? 1 : 0) != 0, (Object)"Found cached state but no corresponding primary state is reported to the job manager. This indicates a problem.");
            taskStateManager.reportTaskStateSnapshots(this.checkpointMetaData, this.checkpointMetrics, (TaskStateSnapshot)(hasAckState ? acknowledgedTaskStateSnapshot : null), (TaskStateSnapshot)(hasLocalState ? localTaskStateSnapshot : null));
            LOG.debug("{} - finished asynchronous part of checkpoint {}. Asynchronous duration: {} ms", new Object[]{this.owner.getName(), this.checkpointMetaData.getCheckpointId(), asyncDurationMillis});
            LOG.trace("{} - reported the following states in snapshot for checkpoint {}: {}.", new Object[]{this.owner.getName(), this.checkpointMetaData.getCheckpointId(), acknowledgedTaskStateSnapshot});
        }

        private void handleExecutionException(Exception e) {
            boolean didCleanup = false;
            CheckpointingOperation.AsyncCheckpointState currentState = this.asyncCheckpointState.get();
            while (CheckpointingOperation.AsyncCheckpointState.DISCARDED != currentState) {
                if (this.asyncCheckpointState.compareAndSet(currentState, CheckpointingOperation.AsyncCheckpointState.DISCARDED)) {
                    didCleanup = true;
                    try {
                        this.cleanup();
                    }
                    catch (Exception cleanupException) {
                        e.addSuppressed(cleanupException);
                    }
                    Exception checkpointException = new Exception("Could not materialize checkpoint " + this.checkpointMetaData.getCheckpointId() + " for operator " + this.owner.getName() + '.', e);
                    ((StreamTask)this.owner).asynchronousCheckpointExceptionHandler.tryHandleCheckpointException(this.checkpointMetaData, checkpointException);
                    currentState = CheckpointingOperation.AsyncCheckpointState.DISCARDED;
                    continue;
                }
                currentState = this.asyncCheckpointState.get();
            }
            if (!didCleanup) {
                LOG.trace("Caught followup exception from a failed checkpoint thread. This can be ignored.", (Throwable)e);
            }
        }

        @Override
        public void close() {
            if (this.asyncCheckpointState.compareAndSet(CheckpointingOperation.AsyncCheckpointState.RUNNING, CheckpointingOperation.AsyncCheckpointState.DISCARDED)) {
                try {
                    this.cleanup();
                }
                catch (Exception cleanupException) {
                    LOG.warn("Could not properly clean up the async checkpoint runnable.", (Throwable)cleanupException);
                }
            } else {
                this.logFailedCleanupAttempt();
            }
        }

        private void cleanup() throws Exception {
            LOG.debug("Cleanup AsyncCheckpointRunnable for checkpoint {} of {}.", (Object)this.checkpointMetaData.getCheckpointId(), (Object)this.owner.getName());
            Exception exception = null;
            for (OperatorSnapshotFutures operatorSnapshotResult : this.operatorSnapshotsInProgress.values()) {
                if (operatorSnapshotResult == null) continue;
                try {
                    operatorSnapshotResult.cancel();
                }
                catch (Exception cancelException) {
                    exception = (Exception)ExceptionUtils.firstOrSuppressed((Throwable)cancelException, exception);
                }
            }
            if (null != exception) {
                throw exception;
            }
        }

        private void logFailedCleanupAttempt() {
            LOG.debug("{} - asynchronous checkpointing operation for checkpoint {} has already been completed. Thus, the state handles are not cleaned up.", (Object)this.owner.getName(), (Object)this.checkpointMetaData.getCheckpointId());
        }
    }
}

