/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.state;

import java.util.Collections;
import java.util.List;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import org.apache.flink.api.common.JobID;
import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
import org.apache.flink.runtime.checkpoint.CheckpointMetrics;
import org.apache.flink.runtime.checkpoint.JobManagerTaskRestore;
import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
import org.apache.flink.runtime.checkpoint.PrioritizedOperatorSubtaskState;
import org.apache.flink.runtime.checkpoint.TaskStateSnapshot;
import org.apache.flink.runtime.checkpoint.channel.SequentialChannelStateReader;
import org.apache.flink.runtime.checkpoint.channel.SequentialChannelStateReaderImpl;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.runtime.state.LocalRecoveryConfig;
import org.apache.flink.runtime.state.TaskLocalStateStore;
import org.apache.flink.runtime.state.TaskStateManager;
import org.apache.flink.runtime.taskmanager.CheckpointResponder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class TaskStateManagerImpl
implements TaskStateManager {
    private static final Logger LOG = LoggerFactory.getLogger(TaskStateManagerImpl.class);
    private final JobID jobId;
    private final ExecutionAttemptID executionAttemptID;
    @Nullable
    private final JobManagerTaskRestore jobManagerTaskRestore;
    private final TaskLocalStateStore localStateStore;
    private final CheckpointResponder checkpointResponder;
    private final SequentialChannelStateReader sequentialChannelStateReader;

    public TaskStateManagerImpl(@Nonnull JobID jobId, @Nonnull ExecutionAttemptID executionAttemptID, @Nonnull TaskLocalStateStore localStateStore, @Nullable JobManagerTaskRestore jobManagerTaskRestore, @Nonnull CheckpointResponder checkpointResponder) {
        this(jobId, executionAttemptID, localStateStore, jobManagerTaskRestore, checkpointResponder, new SequentialChannelStateReaderImpl(jobManagerTaskRestore == null ? new TaskStateSnapshot() : jobManagerTaskRestore.getTaskStateSnapshot()));
    }

    public TaskStateManagerImpl(@Nonnull JobID jobId, @Nonnull ExecutionAttemptID executionAttemptID, @Nonnull TaskLocalStateStore localStateStore, @Nullable JobManagerTaskRestore jobManagerTaskRestore, @Nonnull CheckpointResponder checkpointResponder, @Nonnull SequentialChannelStateReaderImpl sequentialChannelStateReader) {
        this.jobId = jobId;
        this.localStateStore = localStateStore;
        this.jobManagerTaskRestore = jobManagerTaskRestore;
        this.executionAttemptID = executionAttemptID;
        this.checkpointResponder = checkpointResponder;
        this.sequentialChannelStateReader = sequentialChannelStateReader;
    }

    @Override
    public void reportTaskStateSnapshots(@Nonnull CheckpointMetaData checkpointMetaData, @Nonnull CheckpointMetrics checkpointMetrics, @Nullable TaskStateSnapshot acknowledgedState, @Nullable TaskStateSnapshot localState) {
        long checkpointId = checkpointMetaData.getCheckpointId();
        this.localStateStore.storeLocalState(checkpointId, localState);
        this.checkpointResponder.acknowledgeCheckpoint(this.jobId, this.executionAttemptID, checkpointId, checkpointMetrics, acknowledgedState);
    }

    @Override
    @Nonnull
    public PrioritizedOperatorSubtaskState prioritizedOperatorState(OperatorID operatorID) {
        OperatorSubtaskState localSubtaskState;
        if (this.jobManagerTaskRestore == null) {
            return PrioritizedOperatorSubtaskState.emptyNotRestored();
        }
        TaskStateSnapshot jobManagerStateSnapshot = this.jobManagerTaskRestore.getTaskStateSnapshot();
        OperatorSubtaskState jobManagerSubtaskState = jobManagerStateSnapshot.getSubtaskStateByOperatorID(operatorID);
        if (jobManagerSubtaskState == null) {
            return PrioritizedOperatorSubtaskState.emptyNotRestored();
        }
        long restoreCheckpointId = this.jobManagerTaskRestore.getRestoreCheckpointId();
        TaskStateSnapshot localStateSnapshot = this.localStateStore.retrieveLocalState(restoreCheckpointId);
        this.localStateStore.pruneMatchingCheckpoints(checkpointId -> checkpointId != restoreCheckpointId);
        List<OperatorSubtaskState> alternativesByPriority = Collections.emptyList();
        if (localStateSnapshot != null && (localSubtaskState = localStateSnapshot.getSubtaskStateByOperatorID(operatorID)) != null) {
            alternativesByPriority = Collections.singletonList(localSubtaskState);
        }
        LOG.debug("Operator {} has remote state {} from job manager and local state alternatives {} from local state store {}.", new Object[]{operatorID, jobManagerSubtaskState, alternativesByPriority, this.localStateStore});
        PrioritizedOperatorSubtaskState.Builder builder = new PrioritizedOperatorSubtaskState.Builder(jobManagerSubtaskState, alternativesByPriority, true);
        return builder.build();
    }

    @Override
    @Nonnull
    public LocalRecoveryConfig createLocalRecoveryConfig() {
        return this.localStateStore.getLocalRecoveryConfig();
    }

    @Override
    public SequentialChannelStateReader getSequentialChannelStateReader() {
        return this.sequentialChannelStateReader;
    }

    public void notifyCheckpointComplete(long checkpointId) throws Exception {
        this.localStateStore.confirmCheckpoint(checkpointId);
    }

    public void notifyCheckpointAborted(long checkpointId) {
        this.localStateStore.abortCheckpoint(checkpointId);
    }

    @Override
    public void close() throws Exception {
        this.sequentialChannelStateReader.close();
    }
}

