/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.state;

import java.io.Closeable;
import java.io.IOException;
import java.io.OutputStream;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.RunnableFuture;
import javax.annotation.Nonnull;
import org.apache.flink.core.fs.CloseableRegistry;
import org.apache.flink.core.memory.DataOutputView;
import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
import org.apache.flink.runtime.checkpoint.CheckpointOptions;
import org.apache.flink.runtime.state.AbstractSnapshotStrategy;
import org.apache.flink.runtime.state.AsyncSnapshotCallable;
import org.apache.flink.runtime.state.BackendWritableBroadcastState;
import org.apache.flink.runtime.state.CheckpointStreamFactory;
import org.apache.flink.runtime.state.CheckpointedStateScope;
import org.apache.flink.runtime.state.DoneFuture;
import org.apache.flink.runtime.state.OperatorBackendSerializationProxy;
import org.apache.flink.runtime.state.OperatorStateHandle;
import org.apache.flink.runtime.state.OperatorStreamStateHandle;
import org.apache.flink.runtime.state.PartitionableListState;
import org.apache.flink.runtime.state.SnapshotResult;
import org.apache.flink.runtime.state.StreamStateHandle;
import org.apache.flink.runtime.state.metainfo.StateMetaInfoSnapshot;

class DefaultOperatorStateBackendSnapshotStrategy
extends AbstractSnapshotStrategy<OperatorStateHandle> {
    private final ClassLoader userClassLoader;
    private final boolean asynchronousSnapshots;
    private final Map<String, PartitionableListState<?>> registeredOperatorStates;
    private final Map<String, BackendWritableBroadcastState<?, ?>> registeredBroadcastStates;
    private final CloseableRegistry closeStreamOnCancelRegistry;

    protected DefaultOperatorStateBackendSnapshotStrategy(ClassLoader userClassLoader, boolean asynchronousSnapshots, Map<String, PartitionableListState<?>> registeredOperatorStates, Map<String, BackendWritableBroadcastState<?, ?>> registeredBroadcastStates, CloseableRegistry closeStreamOnCancelRegistry) {
        super("DefaultOperatorStateBackend snapshot");
        this.userClassLoader = userClassLoader;
        this.asynchronousSnapshots = asynchronousSnapshots;
        this.registeredOperatorStates = registeredOperatorStates;
        this.registeredBroadcastStates = registeredBroadcastStates;
        this.closeStreamOnCancelRegistry = closeStreamOnCancelRegistry;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    @Nonnull
    public RunnableFuture<SnapshotResult<OperatorStateHandle>> snapshot(long checkpointId, long timestamp, final @Nonnull CheckpointStreamFactory streamFactory, @Nonnull CheckpointOptions checkpointOptions) throws IOException {
        if (this.registeredOperatorStates.isEmpty() && this.registeredBroadcastStates.isEmpty()) {
            return DoneFuture.of(SnapshotResult.empty());
        }
        final HashMap registeredOperatorStatesDeepCopies = new HashMap(this.registeredOperatorStates.size());
        final HashMap registeredBroadcastStatesDeepCopies = new HashMap(this.registeredBroadcastStates.size());
        ClassLoader snapshotClassLoader = Thread.currentThread().getContextClassLoader();
        Thread.currentThread().setContextClassLoader(this.userClassLoader);
        try {
            if (!this.registeredOperatorStates.isEmpty()) {
                for (Map.Entry<String, PartitionableListState<?>> entry : this.registeredOperatorStates.entrySet()) {
                    PartitionableListState<?> listState = entry.getValue();
                    if (null != listState) {
                        listState = listState.deepCopy();
                    }
                    registeredOperatorStatesDeepCopies.put(entry.getKey(), listState);
                }
            }
            if (!this.registeredBroadcastStates.isEmpty()) {
                for (Map.Entry<String, Object> entry : this.registeredBroadcastStates.entrySet()) {
                    BackendWritableBroadcastState broadcastState = (BackendWritableBroadcastState)entry.getValue();
                    if (null != broadcastState) {
                        broadcastState = broadcastState.deepCopy();
                    }
                    registeredBroadcastStatesDeepCopies.put(entry.getKey(), broadcastState);
                }
            }
        }
        finally {
            Thread.currentThread().setContextClassLoader(snapshotClassLoader);
        }
        AsyncSnapshotCallable<SnapshotResult<OperatorStateHandle>> snapshotCallable = new AsyncSnapshotCallable<SnapshotResult<OperatorStateHandle>>(){

            @Override
            protected SnapshotResult<OperatorStateHandle> callInternal() throws Exception {
                OperatorStateHandle.Mode mode;
                long[] partitionOffsets;
                Object value;
                CheckpointStreamFactory.CheckpointStateOutputStream localOut = streamFactory.createCheckpointStateOutputStream(CheckpointedStateScope.EXCLUSIVE);
                this.snapshotCloseableRegistry.registerCloseable((Closeable)((Object)localOut));
                ArrayList<StateMetaInfoSnapshot> operatorMetaInfoSnapshots = new ArrayList<StateMetaInfoSnapshot>(registeredOperatorStatesDeepCopies.size());
                for (Map.Entry entry : registeredOperatorStatesDeepCopies.entrySet()) {
                    operatorMetaInfoSnapshots.add(((PartitionableListState)entry.getValue()).getStateMetaInfo().snapshot());
                }
                ArrayList<StateMetaInfoSnapshot> broadcastMetaInfoSnapshots = new ArrayList<StateMetaInfoSnapshot>(registeredBroadcastStatesDeepCopies.size());
                for (Map.Entry entry : registeredBroadcastStatesDeepCopies.entrySet()) {
                    broadcastMetaInfoSnapshots.add(((BackendWritableBroadcastState)entry.getValue()).getStateMetaInfo().snapshot());
                }
                DataOutputViewStreamWrapper dataOutputViewStreamWrapper = new DataOutputViewStreamWrapper((OutputStream)((Object)localOut));
                OperatorBackendSerializationProxy operatorBackendSerializationProxy = new OperatorBackendSerializationProxy(operatorMetaInfoSnapshots, broadcastMetaInfoSnapshots);
                operatorBackendSerializationProxy.write((DataOutputView)dataOutputViewStreamWrapper);
                int initialMapCapacity = registeredOperatorStatesDeepCopies.size() + registeredBroadcastStatesDeepCopies.size();
                HashMap<String, OperatorStateHandle.StateMetaInfo> writtenStatesMetaData = new HashMap<String, OperatorStateHandle.StateMetaInfo>(initialMapCapacity);
                for (Map.Entry entry : registeredOperatorStatesDeepCopies.entrySet()) {
                    value = (PartitionableListState)entry.getValue();
                    partitionOffsets = ((PartitionableListState)value).write(localOut);
                    mode = ((PartitionableListState)value).getStateMetaInfo().getAssignmentMode();
                    writtenStatesMetaData.put((String)entry.getKey(), new OperatorStateHandle.StateMetaInfo(partitionOffsets, mode));
                }
                for (Map.Entry entry : registeredBroadcastStatesDeepCopies.entrySet()) {
                    value = (BackendWritableBroadcastState)entry.getValue();
                    partitionOffsets = new long[]{value.write(localOut)};
                    mode = value.getStateMetaInfo().getAssignmentMode();
                    writtenStatesMetaData.put((String)entry.getKey(), new OperatorStateHandle.StateMetaInfo(partitionOffsets, mode));
                }
                OperatorStreamStateHandle retValue = null;
                if (this.snapshotCloseableRegistry.unregisterCloseable((Closeable)((Object)localOut))) {
                    StreamStateHandle stateHandle = localOut.closeAndGetHandle();
                    if (stateHandle != null) {
                        retValue = new OperatorStreamStateHandle(writtenStatesMetaData, stateHandle);
                    }
                    return SnapshotResult.of(retValue);
                }
                throw new IOException("Stream was already unregistered.");
            }

            @Override
            protected void cleanupProvidedResources() {
            }

            @Override
            protected void logAsyncSnapshotComplete(long startTime) {
                if (DefaultOperatorStateBackendSnapshotStrategy.this.asynchronousSnapshots) {
                    DefaultOperatorStateBackendSnapshotStrategy.this.logAsyncCompleted(streamFactory, startTime);
                }
            }
        };
        AsyncSnapshotCallable.AsyncSnapshotTask asyncSnapshotTask = snapshotCallable.toAsyncSnapshotFutureTask(this.closeStreamOnCancelRegistry);
        if (!this.asynchronousSnapshots) {
            asyncSnapshotTask.run();
        }
        return asyncSnapshotTask;
    }
}

