/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.state.heap;

import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
import javax.annotation.Nonnull;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.core.fs.CloseableRegistry;
import org.apache.flink.runtime.query.TaskKvStateRegistry;
import org.apache.flink.runtime.state.AbstractKeyedStateBackendBuilder;
import org.apache.flink.runtime.state.BackendBuildingException;
import org.apache.flink.runtime.state.KeyGroupRange;
import org.apache.flink.runtime.state.KeyedStateHandle;
import org.apache.flink.runtime.state.LocalRecoveryConfig;
import org.apache.flink.runtime.state.StreamCompressionDecorator;
import org.apache.flink.runtime.state.heap.AsyncSnapshotStrategySynchronicityBehavior;
import org.apache.flink.runtime.state.heap.HeapKeyedStateBackend;
import org.apache.flink.runtime.state.heap.HeapPriorityQueueSetFactory;
import org.apache.flink.runtime.state.heap.HeapPriorityQueueSnapshotRestoreWrapper;
import org.apache.flink.runtime.state.heap.HeapRestoreOperation;
import org.apache.flink.runtime.state.heap.HeapSnapshotStrategy;
import org.apache.flink.runtime.state.heap.SnapshotStrategySynchronicityBehavior;
import org.apache.flink.runtime.state.heap.StateTable;
import org.apache.flink.runtime.state.heap.SyncSnapshotStrategySynchronicityBehavior;
import org.apache.flink.runtime.state.ttl.TtlTimeProvider;

public class HeapKeyedStateBackendBuilder<K>
extends AbstractKeyedStateBackendBuilder<K> {
    private final LocalRecoveryConfig localRecoveryConfig;
    private final HeapPriorityQueueSetFactory priorityQueueSetFactory;
    private final boolean asynchronousSnapshots;

    public HeapKeyedStateBackendBuilder(TaskKvStateRegistry kvStateRegistry, TypeSerializer<K> keySerializer, ClassLoader userCodeClassLoader, int numberOfKeyGroups, KeyGroupRange keyGroupRange, ExecutionConfig executionConfig, TtlTimeProvider ttlTimeProvider, @Nonnull Collection<KeyedStateHandle> stateHandles, StreamCompressionDecorator keyGroupCompressionDecorator, LocalRecoveryConfig localRecoveryConfig, HeapPriorityQueueSetFactory priorityQueueSetFactory, boolean asynchronousSnapshots, CloseableRegistry cancelStreamRegistry) {
        super(kvStateRegistry, keySerializer, userCodeClassLoader, numberOfKeyGroups, keyGroupRange, executionConfig, ttlTimeProvider, stateHandles, keyGroupCompressionDecorator, cancelStreamRegistry);
        this.localRecoveryConfig = localRecoveryConfig;
        this.priorityQueueSetFactory = priorityQueueSetFactory;
        this.asynchronousSnapshots = asynchronousSnapshots;
    }

    @Override
    public HeapKeyedStateBackend<K> build() throws BackendBuildingException {
        HashMap registeredKVStates = new HashMap();
        HashMap<String, HeapPriorityQueueSnapshotRestoreWrapper> registeredPQStates = new HashMap<String, HeapPriorityQueueSnapshotRestoreWrapper>();
        CloseableRegistry cancelStreamRegistryForBackend = new CloseableRegistry();
        HeapSnapshotStrategy<K> snapshotStrategy = this.initSnapshotStrategy(this.asynchronousSnapshots, registeredKVStates, registeredPQStates, cancelStreamRegistryForBackend);
        HeapKeyedStateBackend<K> backend = new HeapKeyedStateBackend<K>(this.kvStateRegistry, this.keySerializerProvider, this.userCodeClassLoader, this.numberOfKeyGroups, this.keyGroupRange, this.executionConfig, this.ttlTimeProvider, cancelStreamRegistryForBackend, this.keyGroupCompressionDecorator, registeredKVStates, registeredPQStates, this.localRecoveryConfig, this.priorityQueueSetFactory, snapshotStrategy);
        HeapRestoreOperation<K> restoreOperation = new HeapRestoreOperation<K>(this.restoreStateHandles, this.keySerializerProvider, this.userCodeClassLoader, registeredKVStates, registeredPQStates, this.cancelStreamRegistry, this.priorityQueueSetFactory, this.keyGroupRange, this.numberOfKeyGroups, snapshotStrategy, backend);
        try {
            restoreOperation.restore();
        }
        catch (Exception e) {
            backend.dispose();
            throw new BackendBuildingException("Failed when trying to restore heap backend", e);
        }
        return backend;
    }

    private HeapSnapshotStrategy<K> initSnapshotStrategy(boolean asynchronousSnapshots, Map<String, StateTable<K, ?, ?>> registeredKVStates, Map<String, HeapPriorityQueueSnapshotRestoreWrapper> registeredPQStates, CloseableRegistry cancelStreamRegistry) {
        SnapshotStrategySynchronicityBehavior synchronicityTrait = asynchronousSnapshots ? new AsyncSnapshotStrategySynchronicityBehavior() : new SyncSnapshotStrategySynchronicityBehavior();
        return new HeapSnapshotStrategy(synchronicityTrait, registeredKVStates, registeredPQStates, this.keyGroupCompressionDecorator, this.localRecoveryConfig, this.keyGroupRange, cancelStreamRegistry, this.keySerializerProvider);
    }
}

