/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.clusterframework;

import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import org.apache.flink.api.common.resources.CPUResource;
import org.apache.flink.api.common.resources.Resource;
import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.IllegalConfigurationException;
import org.apache.flink.configuration.MemorySize;
import org.apache.flink.configuration.NettyShuffleEnvironmentOptions;
import org.apache.flink.configuration.TaskManagerOptions;
import org.apache.flink.runtime.clusterframework.TaskExecutorProcessSpec;
import org.apache.flink.runtime.clusterframework.TaskExecutorProcessSpecBuilder;
import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
import org.apache.flink.runtime.util.ConfigurationParserUtils;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class TaskExecutorProcessUtils {
    private static final Logger LOG = LoggerFactory.getLogger(TaskExecutorProcessUtils.class);

    private TaskExecutorProcessUtils() {
    }

    public static String generateJvmParametersStr(TaskExecutorProcessSpec taskExecutorProcessSpec) {
        MemorySize jvmHeapSize = taskExecutorProcessSpec.getJvmHeapMemorySize();
        MemorySize jvmDirectSize = taskExecutorProcessSpec.getJvmDirectMemorySize();
        MemorySize jvmMetaspaceSize = taskExecutorProcessSpec.getJvmMetaspaceSize();
        return "-Xmx" + jvmHeapSize.getBytes() + " -Xms" + jvmHeapSize.getBytes() + " -XX:MaxDirectMemorySize=" + jvmDirectSize.getBytes() + " -XX:MaxMetaspaceSize=" + jvmMetaspaceSize.getBytes();
    }

    public static String generateDynamicConfigsStr(TaskExecutorProcessSpec taskExecutorProcessSpec) {
        HashMap<String, String> configs = new HashMap<String, String>();
        configs.put(TaskManagerOptions.CPU_CORES.key(), String.valueOf(taskExecutorProcessSpec.getCpuCores().getValue().doubleValue()));
        configs.put(TaskManagerOptions.FRAMEWORK_HEAP_MEMORY.key(), taskExecutorProcessSpec.getFrameworkHeapSize().getBytes() + "b");
        configs.put(TaskManagerOptions.FRAMEWORK_OFF_HEAP_MEMORY.key(), taskExecutorProcessSpec.getFrameworkOffHeapMemorySize().getBytes() + "b");
        configs.put(TaskManagerOptions.TASK_HEAP_MEMORY.key(), taskExecutorProcessSpec.getTaskHeapSize().getBytes() + "b");
        configs.put(TaskManagerOptions.TASK_OFF_HEAP_MEMORY.key(), taskExecutorProcessSpec.getTaskOffHeapSize().getBytes() + "b");
        configs.put(TaskManagerOptions.NETWORK_MEMORY_MIN.key(), taskExecutorProcessSpec.getNetworkMemSize().getBytes() + "b");
        configs.put(TaskManagerOptions.NETWORK_MEMORY_MAX.key(), taskExecutorProcessSpec.getNetworkMemSize().getBytes() + "b");
        configs.put(TaskManagerOptions.MANAGED_MEMORY_SIZE.key(), taskExecutorProcessSpec.getManagedMemorySize().getBytes() + "b");
        return TaskExecutorProcessUtils.assembleDynamicConfigsStr(configs);
    }

    private static String assembleDynamicConfigsStr(Map<String, String> configs) {
        StringBuilder sb = new StringBuilder();
        for (Map.Entry<String, String> entry : configs.entrySet()) {
            sb.append("-D ").append(entry.getKey()).append("=").append(entry.getValue()).append(" ");
        }
        return sb.toString();
    }

    public static List<ResourceProfile> createDefaultWorkerSlotProfiles(TaskExecutorProcessSpec taskExecutorProcessSpec, int numberOfSlots) {
        ResourceProfile resourceProfile = TaskExecutorProcessUtils.generateDefaultSlotResourceProfile(taskExecutorProcessSpec, numberOfSlots);
        return Collections.nCopies(numberOfSlots, resourceProfile);
    }

    public static ResourceProfile generateDefaultSlotResourceProfile(TaskExecutorProcessSpec taskExecutorProcessSpec, int numberOfSlots) {
        return ResourceProfile.newBuilder().setCpuCores(taskExecutorProcessSpec.getCpuCores().divide(numberOfSlots)).setTaskHeapMemory(taskExecutorProcessSpec.getTaskHeapSize().divide((long)numberOfSlots)).setTaskOffHeapMemory(taskExecutorProcessSpec.getTaskOffHeapSize().divide((long)numberOfSlots)).setManagedMemory(taskExecutorProcessSpec.getManagedMemorySize().divide((long)numberOfSlots)).setNetworkMemory(taskExecutorProcessSpec.getNetworkMemSize().divide((long)numberOfSlots)).build();
    }

    public static ResourceProfile generateTotalAvailableResourceProfile(TaskExecutorProcessSpec taskExecutorProcessSpec) {
        return ResourceProfile.newBuilder().setCpuCores((Resource)taskExecutorProcessSpec.getCpuCores()).setTaskHeapMemory(taskExecutorProcessSpec.getTaskHeapSize()).setTaskOffHeapMemory(taskExecutorProcessSpec.getTaskOffHeapSize()).setManagedMemory(taskExecutorProcessSpec.getManagedMemorySize()).setNetworkMemory(taskExecutorProcessSpec.getNetworkMemSize()).build();
    }

    public static TaskExecutorProcessSpecBuilder newProcessSpecBuilder(Configuration config) {
        return TaskExecutorProcessSpecBuilder.newBuilder(config);
    }

    public static TaskExecutorProcessSpec processSpecFromConfig(Configuration config) {
        if (TaskExecutorProcessUtils.isTaskHeapMemorySizeExplicitlyConfigured(config) && TaskExecutorProcessUtils.isManagedMemorySizeExplicitlyConfigured(config)) {
            return TaskExecutorProcessUtils.deriveProcessSpecWithExplicitTaskAndManagedMemory(config);
        }
        if (TaskExecutorProcessUtils.isTotalFlinkMemorySizeExplicitlyConfigured(config)) {
            return TaskExecutorProcessUtils.deriveProcessSpecWithTotalFlinkMemory(config);
        }
        if (TaskExecutorProcessUtils.isTotalProcessMemorySizeExplicitlyConfigured(config)) {
            return TaskExecutorProcessUtils.deriveProcessSpecWithTotalProcessMemory(config);
        }
        throw new IllegalConfigurationException(String.format("Either Task Heap Memory size (%s) and Managed Memory size (%s), or Total Flink Memory size (%s), or Total Process Memory size (%s) need to be configured explicitly.", TaskManagerOptions.TASK_HEAP_MEMORY.key(), TaskManagerOptions.MANAGED_MEMORY_SIZE.key(), TaskManagerOptions.TOTAL_FLINK_MEMORY.key(), TaskManagerOptions.TOTAL_PROCESS_MEMORY.key()));
    }

    public static boolean isTaskExecutorProcessResourceExplicitlyConfigured(Configuration config) {
        return TaskExecutorProcessUtils.isTaskHeapMemorySizeExplicitlyConfigured(config) && TaskExecutorProcessUtils.isManagedMemorySizeExplicitlyConfigured(config) || TaskExecutorProcessUtils.isTotalFlinkMemorySizeExplicitlyConfigured(config) || TaskExecutorProcessUtils.isTotalProcessMemorySizeExplicitlyConfigured(config);
    }

    private static TaskExecutorProcessSpec deriveProcessSpecWithExplicitTaskAndManagedMemory(Configuration config) {
        MemorySize networkMemorySize;
        MemorySize taskHeapMemorySize = TaskExecutorProcessUtils.getTaskHeapMemorySize(config);
        MemorySize managedMemorySize = TaskExecutorProcessUtils.getManagedMemorySize(config);
        MemorySize frameworkHeapMemorySize = TaskExecutorProcessUtils.getFrameworkHeapMemorySize(config);
        MemorySize frameworkOffHeapMemorySize = TaskExecutorProcessUtils.getFrameworkOffHeapMemorySize(config);
        MemorySize taskOffHeapMemorySize = TaskExecutorProcessUtils.getTaskOffHeapMemorySize(config);
        MemorySize totalFlinkExcludeNetworkMemorySize = frameworkHeapMemorySize.add(frameworkOffHeapMemorySize).add(taskHeapMemorySize).add(taskOffHeapMemorySize).add(managedMemorySize);
        if (TaskExecutorProcessUtils.isTotalFlinkMemorySizeExplicitlyConfigured(config)) {
            MemorySize totalFlinkMemorySize = TaskExecutorProcessUtils.getTotalFlinkMemorySize(config);
            if (totalFlinkExcludeNetworkMemorySize.getBytes() > totalFlinkMemorySize.getBytes()) {
                throw new IllegalConfigurationException("Sum of configured Framework Heap Memory (" + frameworkHeapMemorySize.toHumanReadableString() + "), Framework Off-Heap Memory (" + frameworkOffHeapMemorySize.toHumanReadableString() + "), Task Heap Memory (" + taskHeapMemorySize.toHumanReadableString() + "), Task Off-Heap Memory (" + taskOffHeapMemorySize.toHumanReadableString() + ") and Managed Memory (" + managedMemorySize.toHumanReadableString() + ") exceed configured Total Flink Memory (" + totalFlinkMemorySize.toHumanReadableString() + ").");
            }
            networkMemorySize = totalFlinkMemorySize.subtract(totalFlinkExcludeNetworkMemorySize);
            TaskExecutorProcessUtils.sanityCheckNetworkMemoryWithExplicitlySetTotalFlinkAndHeapMemory(config, networkMemorySize, totalFlinkMemorySize);
        } else {
            networkMemorySize = TaskExecutorProcessUtils.isUsingLegacyNetworkConfigs(config) ? TaskExecutorProcessUtils.getNetworkMemorySizeWithLegacyConfig(config) : TaskExecutorProcessUtils.deriveNetworkMemoryWithInverseFraction(config, totalFlinkExcludeNetworkMemorySize);
        }
        FlinkInternalMemory flinkInternalMemory = new FlinkInternalMemory(frameworkHeapMemorySize, frameworkOffHeapMemorySize, taskHeapMemorySize, taskOffHeapMemorySize, networkMemorySize, managedMemorySize);
        TaskExecutorProcessUtils.sanityCheckTotalFlinkMemory(config, flinkInternalMemory);
        JvmMetaspaceAndOverhead jvmMetaspaceAndOverhead = TaskExecutorProcessUtils.deriveJvmMetaspaceAndOverheadFromTotalFlinkMemory(config, flinkInternalMemory.getTotalFlinkMemorySize());
        return TaskExecutorProcessUtils.createTaskExecutorProcessSpec(config, flinkInternalMemory, jvmMetaspaceAndOverhead);
    }

    private static TaskExecutorProcessSpec deriveProcessSpecWithTotalFlinkMemory(Configuration config) {
        MemorySize totalFlinkMemorySize = TaskExecutorProcessUtils.getTotalFlinkMemorySize(config);
        FlinkInternalMemory flinkInternalMemory = TaskExecutorProcessUtils.deriveInternalMemoryFromTotalFlinkMemory(config, totalFlinkMemorySize);
        JvmMetaspaceAndOverhead jvmMetaspaceAndOverhead = TaskExecutorProcessUtils.deriveJvmMetaspaceAndOverheadFromTotalFlinkMemory(config, totalFlinkMemorySize);
        return TaskExecutorProcessUtils.createTaskExecutorProcessSpec(config, flinkInternalMemory, jvmMetaspaceAndOverhead);
    }

    private static TaskExecutorProcessSpec deriveProcessSpecWithTotalProcessMemory(Configuration config) {
        MemorySize jvmOverheadSize;
        MemorySize totalProcessMemorySize = TaskExecutorProcessUtils.getTotalProcessMemorySize(config);
        MemorySize jvmMetaspaceSize = TaskExecutorProcessUtils.getJvmMetaspaceSize(config);
        JvmMetaspaceAndOverhead jvmMetaspaceAndOverhead = new JvmMetaspaceAndOverhead(jvmMetaspaceSize, jvmOverheadSize = TaskExecutorProcessUtils.deriveJvmOverheadWithFraction(config, totalProcessMemorySize));
        if (jvmMetaspaceAndOverhead.getTotalJvmMetaspaceAndOverheadSize().getBytes() > totalProcessMemorySize.getBytes()) {
            throw new IllegalConfigurationException("Sum of configured JVM Metaspace (" + jvmMetaspaceAndOverhead.metaspace.toHumanReadableString() + ") and JVM Overhead (" + jvmMetaspaceAndOverhead.overhead.toHumanReadableString() + ") exceed configured Total Process Memory (" + totalProcessMemorySize.toHumanReadableString() + ").");
        }
        MemorySize totalFlinkMemorySize = totalProcessMemorySize.subtract(jvmMetaspaceAndOverhead.getTotalJvmMetaspaceAndOverheadSize());
        FlinkInternalMemory flinkInternalMemory = TaskExecutorProcessUtils.deriveInternalMemoryFromTotalFlinkMemory(config, totalFlinkMemorySize);
        return TaskExecutorProcessUtils.createTaskExecutorProcessSpec(config, flinkInternalMemory, jvmMetaspaceAndOverhead);
    }

    private static JvmMetaspaceAndOverhead deriveJvmMetaspaceAndOverheadFromTotalFlinkMemory(Configuration config, MemorySize totalFlinkMemorySize) {
        JvmMetaspaceAndOverhead jvmMetaspaceAndOverhead;
        MemorySize jvmMetaspaceSize = TaskExecutorProcessUtils.getJvmMetaspaceSize(config);
        MemorySize totalFlinkAndJvmMetaspaceSize = totalFlinkMemorySize.add(jvmMetaspaceSize);
        if (TaskExecutorProcessUtils.isTotalProcessMemorySizeExplicitlyConfigured(config)) {
            MemorySize totalProcessMemorySize = TaskExecutorProcessUtils.getTotalProcessMemorySize(config);
            MemorySize jvmOverheadSize = totalProcessMemorySize.subtract(totalFlinkAndJvmMetaspaceSize);
            TaskExecutorProcessUtils.sanityCheckJvmOverhead(config, jvmOverheadSize, totalProcessMemorySize);
            jvmMetaspaceAndOverhead = new JvmMetaspaceAndOverhead(jvmMetaspaceSize, jvmOverheadSize);
        } else {
            MemorySize jvmOverheadSize = TaskExecutorProcessUtils.deriveJvmOverheadWithInverseFraction(config, totalFlinkAndJvmMetaspaceSize);
            jvmMetaspaceAndOverhead = new JvmMetaspaceAndOverhead(jvmMetaspaceSize, jvmOverheadSize);
            TaskExecutorProcessUtils.sanityCheckTotalProcessMemory(config, totalFlinkMemorySize, jvmMetaspaceAndOverhead);
        }
        return jvmMetaspaceAndOverhead;
    }

    private static FlinkInternalMemory deriveInternalMemoryFromTotalFlinkMemory(Configuration config, MemorySize totalFlinkMemorySize) {
        MemorySize networkMemorySize;
        MemorySize managedMemorySize;
        MemorySize taskHeapMemorySize;
        MemorySize frameworkHeapMemorySize = TaskExecutorProcessUtils.getFrameworkHeapMemorySize(config);
        MemorySize frameworkOffHeapMemorySize = TaskExecutorProcessUtils.getFrameworkOffHeapMemorySize(config);
        MemorySize taskOffHeapMemorySize = TaskExecutorProcessUtils.getTaskOffHeapMemorySize(config);
        if (TaskExecutorProcessUtils.isTaskHeapMemorySizeExplicitlyConfigured(config)) {
            taskHeapMemorySize = TaskExecutorProcessUtils.getTaskHeapMemorySize(config);
            managedMemorySize = TaskExecutorProcessUtils.deriveManagedMemoryAbsoluteOrWithFraction(config, totalFlinkMemorySize);
            MemorySize totalFlinkExcludeNetworkMemorySize = frameworkHeapMemorySize.add(frameworkOffHeapMemorySize).add(taskHeapMemorySize).add(taskOffHeapMemorySize).add(managedMemorySize);
            if (totalFlinkExcludeNetworkMemorySize.getBytes() > totalFlinkMemorySize.getBytes()) {
                throw new IllegalConfigurationException("Sum of configured Framework Heap Memory (" + frameworkHeapMemorySize.toHumanReadableString() + "), Framework Off-Heap Memory (" + frameworkOffHeapMemorySize.toHumanReadableString() + "), Task Heap Memory (" + taskHeapMemorySize.toHumanReadableString() + "), Task Off-Heap Memory (" + taskOffHeapMemorySize.toHumanReadableString() + ") and Managed Memory (" + managedMemorySize.toHumanReadableString() + ") exceed configured Total Flink Memory (" + totalFlinkMemorySize.toHumanReadableString() + ").");
            }
            networkMemorySize = totalFlinkMemorySize.subtract(totalFlinkExcludeNetworkMemorySize);
            TaskExecutorProcessUtils.sanityCheckNetworkMemoryWithExplicitlySetTotalFlinkAndHeapMemory(config, networkMemorySize, totalFlinkMemorySize);
        } else {
            managedMemorySize = TaskExecutorProcessUtils.deriveManagedMemoryAbsoluteOrWithFraction(config, totalFlinkMemorySize);
            networkMemorySize = TaskExecutorProcessUtils.isUsingLegacyNetworkConfigs(config) ? TaskExecutorProcessUtils.getNetworkMemorySizeWithLegacyConfig(config) : TaskExecutorProcessUtils.deriveNetworkMemoryWithFraction(config, totalFlinkMemorySize);
            MemorySize totalFlinkExcludeTaskHeapMemorySize = frameworkHeapMemorySize.add(frameworkOffHeapMemorySize).add(taskOffHeapMemorySize).add(managedMemorySize).add(networkMemorySize);
            if (totalFlinkExcludeTaskHeapMemorySize.getBytes() > totalFlinkMemorySize.getBytes()) {
                throw new IllegalConfigurationException("Sum of configured Framework Heap Memory (" + frameworkHeapMemorySize.toHumanReadableString() + "), Framework Off-Heap Memory (" + frameworkOffHeapMemorySize.toHumanReadableString() + "), Task Off-Heap Memory (" + taskOffHeapMemorySize.toHumanReadableString() + "), Managed Memory (" + managedMemorySize.toHumanReadableString() + ") and Network Memory (" + networkMemorySize.toHumanReadableString() + ") exceed configured Total Flink Memory (" + totalFlinkMemorySize.toHumanReadableString() + ").");
            }
            taskHeapMemorySize = totalFlinkMemorySize.subtract(totalFlinkExcludeTaskHeapMemorySize);
        }
        FlinkInternalMemory flinkInternalMemory = new FlinkInternalMemory(frameworkHeapMemorySize, frameworkOffHeapMemorySize, taskHeapMemorySize, taskOffHeapMemorySize, networkMemorySize, managedMemorySize);
        TaskExecutorProcessUtils.sanityCheckTotalFlinkMemory(config, flinkInternalMemory);
        return flinkInternalMemory;
    }

    private static MemorySize deriveManagedMemoryAbsoluteOrWithFraction(Configuration config, MemorySize base) {
        if (TaskExecutorProcessUtils.isManagedMemorySizeExplicitlyConfigured(config)) {
            return TaskExecutorProcessUtils.getManagedMemorySize(config);
        }
        return TaskExecutorProcessUtils.deriveWithFraction("managed memory", base, TaskExecutorProcessUtils.getManagedMemoryRangeFraction(config));
    }

    private static MemorySize deriveNetworkMemoryWithFraction(Configuration config, MemorySize base) {
        return TaskExecutorProcessUtils.deriveWithFraction("network memory", base, TaskExecutorProcessUtils.getNetworkMemoryRangeFraction(config));
    }

    private static MemorySize deriveNetworkMemoryWithInverseFraction(Configuration config, MemorySize base) {
        return TaskExecutorProcessUtils.deriveWithInverseFraction("network memory", base, TaskExecutorProcessUtils.getNetworkMemoryRangeFraction(config));
    }

    private static MemorySize deriveJvmOverheadWithFraction(Configuration config, MemorySize base) {
        return TaskExecutorProcessUtils.deriveWithFraction("jvm overhead memory", base, TaskExecutorProcessUtils.getJvmOverheadRangeFraction(config));
    }

    private static MemorySize deriveJvmOverheadWithInverseFraction(Configuration config, MemorySize base) {
        return TaskExecutorProcessUtils.deriveWithInverseFraction("jvm overhead memory", base, TaskExecutorProcessUtils.getJvmOverheadRangeFraction(config));
    }

    private static MemorySize deriveWithFraction(String memoryDescription, MemorySize base, RangeFraction rangeFraction) {
        MemorySize relative = base.multiply(rangeFraction.fraction);
        return TaskExecutorProcessUtils.capToMinMax(memoryDescription, relative, rangeFraction);
    }

    private static MemorySize deriveWithInverseFraction(String memoryDescription, MemorySize base, RangeFraction rangeFraction) {
        Preconditions.checkArgument((rangeFraction.fraction < 1.0 ? 1 : 0) != 0);
        MemorySize relative = base.multiply(rangeFraction.fraction / (1.0 - rangeFraction.fraction));
        return TaskExecutorProcessUtils.capToMinMax(memoryDescription, relative, rangeFraction);
    }

    private static MemorySize capToMinMax(String memoryDescription, MemorySize relative, RangeFraction rangeFraction) {
        long size = relative.getBytes();
        if (size > rangeFraction.maxSize.getBytes()) {
            LOG.info("The derived from fraction {} ({}) is greater than its max value {}, max value will be used instead", new Object[]{memoryDescription, relative.toHumanReadableString(), rangeFraction.maxSize.toHumanReadableString()});
            size = rangeFraction.maxSize.getBytes();
        } else if (size < rangeFraction.minSize.getBytes()) {
            LOG.info("The derived from fraction {} ({}) is less than its min value {}, min value will be used instead", new Object[]{memoryDescription, relative.toHumanReadableString(), rangeFraction.minSize.toHumanReadableString()});
            size = rangeFraction.minSize.getBytes();
        }
        return new MemorySize(size);
    }

    private static MemorySize getFrameworkHeapMemorySize(Configuration config) {
        return TaskExecutorProcessUtils.getMemorySizeFromConfig(config, (ConfigOption<MemorySize>)TaskManagerOptions.FRAMEWORK_HEAP_MEMORY);
    }

    private static MemorySize getFrameworkOffHeapMemorySize(Configuration config) {
        return TaskExecutorProcessUtils.getMemorySizeFromConfig(config, (ConfigOption<MemorySize>)TaskManagerOptions.FRAMEWORK_OFF_HEAP_MEMORY);
    }

    private static MemorySize getTaskHeapMemorySize(Configuration config) {
        Preconditions.checkArgument((boolean)TaskExecutorProcessUtils.isTaskHeapMemorySizeExplicitlyConfigured(config));
        return TaskExecutorProcessUtils.getMemorySizeFromConfig(config, (ConfigOption<MemorySize>)TaskManagerOptions.TASK_HEAP_MEMORY);
    }

    private static MemorySize getTaskOffHeapMemorySize(Configuration config) {
        return TaskExecutorProcessUtils.getMemorySizeFromConfig(config, (ConfigOption<MemorySize>)TaskManagerOptions.TASK_OFF_HEAP_MEMORY);
    }

    private static MemorySize getManagedMemorySize(Configuration config) {
        Preconditions.checkArgument((boolean)TaskExecutorProcessUtils.isManagedMemorySizeExplicitlyConfigured(config));
        return TaskExecutorProcessUtils.getMemorySizeFromConfig(config, (ConfigOption<MemorySize>)TaskManagerOptions.MANAGED_MEMORY_SIZE);
    }

    private static RangeFraction getManagedMemoryRangeFraction(Configuration config) {
        return TaskExecutorProcessUtils.getRangeFraction(MemorySize.ZERO, MemorySize.MAX_VALUE, (ConfigOption<Float>)TaskManagerOptions.MANAGED_MEMORY_FRACTION, config);
    }

    private static MemorySize getNetworkMemorySizeWithLegacyConfig(Configuration config) {
        Preconditions.checkArgument((boolean)TaskExecutorProcessUtils.isUsingLegacyNetworkConfigs(config));
        long numOfBuffers = config.getInteger(NettyShuffleEnvironmentOptions.NETWORK_NUM_BUFFERS);
        long pageSize = ConfigurationParserUtils.getPageSize(config);
        return new MemorySize(numOfBuffers * pageSize);
    }

    private static RangeFraction getNetworkMemoryRangeFraction(Configuration config) {
        MemorySize minSize = TaskExecutorProcessUtils.getMemorySizeFromConfig(config, (ConfigOption<MemorySize>)TaskManagerOptions.NETWORK_MEMORY_MIN);
        MemorySize maxSize = TaskExecutorProcessUtils.getMemorySizeFromConfig(config, (ConfigOption<MemorySize>)TaskManagerOptions.NETWORK_MEMORY_MAX);
        return TaskExecutorProcessUtils.getRangeFraction(minSize, maxSize, (ConfigOption<Float>)TaskManagerOptions.NETWORK_MEMORY_FRACTION, config);
    }

    private static MemorySize getJvmMetaspaceSize(Configuration config) {
        return TaskExecutorProcessUtils.getMemorySizeFromConfig(config, (ConfigOption<MemorySize>)TaskManagerOptions.JVM_METASPACE);
    }

    private static RangeFraction getJvmOverheadRangeFraction(Configuration config) {
        MemorySize minSize = TaskExecutorProcessUtils.getMemorySizeFromConfig(config, (ConfigOption<MemorySize>)TaskManagerOptions.JVM_OVERHEAD_MIN);
        MemorySize maxSize = TaskExecutorProcessUtils.getMemorySizeFromConfig(config, (ConfigOption<MemorySize>)TaskManagerOptions.JVM_OVERHEAD_MAX);
        return TaskExecutorProcessUtils.getRangeFraction(minSize, maxSize, (ConfigOption<Float>)TaskManagerOptions.JVM_OVERHEAD_FRACTION, config);
    }

    private static RangeFraction getRangeFraction(MemorySize minSize, MemorySize maxSize, ConfigOption<Float> fractionOption, Configuration config) {
        double fraction = config.getFloat(fractionOption);
        try {
            return new RangeFraction(minSize, maxSize, fraction);
        }
        catch (IllegalArgumentException e) {
            throw new IllegalConfigurationException(String.format("Inconsistently configured %s (%s) and its min (%s), max (%s) value", fractionOption, fraction, minSize.toHumanReadableString(), maxSize.toHumanReadableString()), (Throwable)e);
        }
    }

    private static MemorySize getTotalFlinkMemorySize(Configuration config) {
        Preconditions.checkArgument((boolean)TaskExecutorProcessUtils.isTotalFlinkMemorySizeExplicitlyConfigured(config));
        return TaskExecutorProcessUtils.getMemorySizeFromConfig(config, (ConfigOption<MemorySize>)TaskManagerOptions.TOTAL_FLINK_MEMORY);
    }

    private static MemorySize getTotalProcessMemorySize(Configuration config) {
        Preconditions.checkArgument((boolean)TaskExecutorProcessUtils.isTotalProcessMemorySizeExplicitlyConfigured(config));
        return TaskExecutorProcessUtils.getMemorySizeFromConfig(config, (ConfigOption<MemorySize>)TaskManagerOptions.TOTAL_PROCESS_MEMORY);
    }

    private static MemorySize getMemorySizeFromConfig(Configuration config, ConfigOption<MemorySize> option) {
        try {
            return (MemorySize)config.get(option);
        }
        catch (Throwable t) {
            throw new IllegalConfigurationException("Cannot read memory size from config option '" + option.key() + "'.", t);
        }
    }

    private static boolean isTaskHeapMemorySizeExplicitlyConfigured(Configuration config) {
        return config.contains(TaskManagerOptions.TASK_HEAP_MEMORY);
    }

    public static boolean isManagedMemorySizeExplicitlyConfigured(Configuration config) {
        return config.contains(TaskManagerOptions.MANAGED_MEMORY_SIZE);
    }

    private static boolean isUsingLegacyNetworkConfigs(Configuration config) {
        boolean legacyConfigured = config.contains(NettyShuffleEnvironmentOptions.NETWORK_NUM_BUFFERS);
        return !config.contains(TaskManagerOptions.NETWORK_MEMORY_MIN) && !config.contains(TaskManagerOptions.NETWORK_MEMORY_MAX) && !config.contains(TaskManagerOptions.NETWORK_MEMORY_FRACTION) && legacyConfigured;
    }

    private static boolean isNetworkMemoryFractionExplicitlyConfigured(Configuration config) {
        return config.contains(TaskManagerOptions.NETWORK_MEMORY_FRACTION);
    }

    public static boolean isNetworkMemoryExplicitlyConfigured(Configuration config) {
        boolean legacyConfigured = config.contains(NettyShuffleEnvironmentOptions.NETWORK_NUM_BUFFERS);
        return config.contains(TaskManagerOptions.NETWORK_MEMORY_MAX) || config.contains(TaskManagerOptions.NETWORK_MEMORY_MIN) || config.contains(TaskManagerOptions.NETWORK_MEMORY_FRACTION) || legacyConfigured;
    }

    private static boolean isJvmOverheadFractionExplicitlyConfigured(Configuration config) {
        return config.contains(TaskManagerOptions.JVM_OVERHEAD_FRACTION);
    }

    private static boolean isTotalFlinkMemorySizeExplicitlyConfigured(Configuration config) {
        return config.contains(TaskManagerOptions.TOTAL_FLINK_MEMORY);
    }

    private static boolean isTotalProcessMemorySizeExplicitlyConfigured(Configuration config) {
        return config.contains(TaskManagerOptions.TOTAL_PROCESS_MEMORY);
    }

    private static void sanityCheckTotalFlinkMemory(Configuration config, FlinkInternalMemory flinkInternalMemory) {
        MemorySize configuredTotalFlinkMemorySize;
        if (TaskExecutorProcessUtils.isTotalFlinkMemorySizeExplicitlyConfigured(config) && !(configuredTotalFlinkMemorySize = TaskExecutorProcessUtils.getTotalFlinkMemorySize(config)).equals((Object)flinkInternalMemory.getTotalFlinkMemorySize())) {
            throw new IllegalConfigurationException("Configured/Derived Flink internal memory sizes (total " + flinkInternalMemory.getTotalFlinkMemorySize().toHumanReadableString() + ") do not add up to the configured Total Flink Memory size (" + configuredTotalFlinkMemorySize.toHumanReadableString() + "). Configured/Derived Flink internal memory sizes are: Framework Heap Memory (" + flinkInternalMemory.frameworkHeap.toHumanReadableString() + "), Framework Off-Heap Memory (" + flinkInternalMemory.frameworkOffHeap.toHumanReadableString() + "), Task Heap Memory (" + flinkInternalMemory.taskHeap.toHumanReadableString() + "), Task Off-Heap Memory (" + flinkInternalMemory.taskOffHeap.toHumanReadableString() + "), Network Memory (" + flinkInternalMemory.network.toHumanReadableString() + "), Managed Memory (" + flinkInternalMemory.managed.toHumanReadableString() + ").");
        }
    }

    private static void sanityCheckTotalProcessMemory(Configuration config, MemorySize totalFlinkMemory, JvmMetaspaceAndOverhead jvmMetaspaceAndOverhead) {
        MemorySize configuredTotalProcessMemorySize;
        MemorySize derivedTotalProcessMemorySize = totalFlinkMemory.add(jvmMetaspaceAndOverhead.metaspace).add(jvmMetaspaceAndOverhead.overhead);
        if (TaskExecutorProcessUtils.isTotalProcessMemorySizeExplicitlyConfigured(config) && !(configuredTotalProcessMemorySize = TaskExecutorProcessUtils.getTotalProcessMemorySize(config)).equals((Object)derivedTotalProcessMemorySize)) {
            throw new IllegalConfigurationException("Configured/Derived memory sizes (total " + derivedTotalProcessMemorySize.toHumanReadableString() + ") do not add up to the configured Total Process Memory size (" + configuredTotalProcessMemorySize.toHumanReadableString() + "). Configured/Derived memory sizes are: Total Flink Memory (" + totalFlinkMemory.toHumanReadableString() + "), JVM Metaspace (" + jvmMetaspaceAndOverhead.metaspace.toHumanReadableString() + "), JVM Overhead (" + jvmMetaspaceAndOverhead.overhead.toHumanReadableString() + ").");
        }
    }

    private static void sanityCheckNetworkMemoryWithExplicitlySetTotalFlinkAndHeapMemory(Configuration config, MemorySize derivedNetworkMemorySize, MemorySize totalFlinkMemorySize) {
        try {
            TaskExecutorProcessUtils.sanityCheckNetworkMemory(config, derivedNetworkMemorySize, totalFlinkMemorySize);
        }
        catch (IllegalConfigurationException e) {
            throw new IllegalConfigurationException("If Total Flink, Task Heap and (or) Managed Memory sizes are explicitly configured then the Network Memory size is the rest of the Total Flink memory after subtracting all other configured types of memory, but the derived Network Memory is inconsistent with its configuration.", (Throwable)e);
        }
    }

    private static void sanityCheckNetworkMemory(Configuration config, MemorySize derivedNetworkMemorySize, MemorySize totalFlinkMemorySize) {
        if (TaskExecutorProcessUtils.isUsingLegacyNetworkConfigs(config)) {
            MemorySize configuredNetworkMemorySize = TaskExecutorProcessUtils.getNetworkMemorySizeWithLegacyConfig(config);
            if (!configuredNetworkMemorySize.equals((Object)derivedNetworkMemorySize)) {
                throw new IllegalConfigurationException("Derived Network Memory size (" + derivedNetworkMemorySize.toHumanReadableString() + ") does not match configured Network Memory size (" + configuredNetworkMemorySize.toHumanReadableString() + ").");
            }
        } else {
            RangeFraction networkRangeFraction = TaskExecutorProcessUtils.getNetworkMemoryRangeFraction(config);
            if (derivedNetworkMemorySize.getBytes() > networkRangeFraction.maxSize.getBytes() || derivedNetworkMemorySize.getBytes() < networkRangeFraction.minSize.getBytes()) {
                throw new IllegalConfigurationException("Derived Network Memory size (" + derivedNetworkMemorySize.toHumanReadableString() + ") is not in configured Network Memory range [" + networkRangeFraction.minSize.toHumanReadableString() + ", " + networkRangeFraction.maxSize.toHumanReadableString() + "].");
            }
            if (TaskExecutorProcessUtils.isNetworkMemoryFractionExplicitlyConfigured(config) && !derivedNetworkMemorySize.equals((Object)totalFlinkMemorySize.multiply(networkRangeFraction.fraction))) {
                LOG.info("The derived Network Memory size ({}) does not match the configured Network Memory fraction ({}) from the configured Total Flink Memory size ({}). The derived Network Memory size will be used.", new Object[]{derivedNetworkMemorySize.toHumanReadableString(), networkRangeFraction.fraction, totalFlinkMemorySize.toHumanReadableString()});
            }
        }
    }

    private static void sanityCheckJvmOverhead(Configuration config, MemorySize derivedJvmOverheadSize, MemorySize totalProcessMemorySize) {
        RangeFraction jvmOverheadRangeFraction = TaskExecutorProcessUtils.getJvmOverheadRangeFraction(config);
        if (derivedJvmOverheadSize.getBytes() > jvmOverheadRangeFraction.maxSize.getBytes() || derivedJvmOverheadSize.getBytes() < jvmOverheadRangeFraction.minSize.getBytes()) {
            throw new IllegalConfigurationException("Derived JVM Overhead size (" + derivedJvmOverheadSize.toHumanReadableString() + ") is not in configured JVM Overhead range [" + jvmOverheadRangeFraction.minSize.toHumanReadableString() + ", " + jvmOverheadRangeFraction.maxSize.toHumanReadableString() + "].");
        }
        if (TaskExecutorProcessUtils.isJvmOverheadFractionExplicitlyConfigured(config) && !derivedJvmOverheadSize.equals((Object)totalProcessMemorySize.multiply(jvmOverheadRangeFraction.fraction))) {
            LOG.info("The derived JVM Overhead size ({}) does not match the configured JVM Overhead fraction ({}) from the configured Total Process Memory size ({}). The derived JVM OVerhead size will be used.", new Object[]{derivedJvmOverheadSize.toHumanReadableString(), jvmOverheadRangeFraction.fraction, totalProcessMemorySize.toHumanReadableString()});
        }
    }

    private static CPUResource getCpuCores(Configuration config) {
        return TaskExecutorProcessUtils.getCpuCoresWithFallback(config, -1.0);
    }

    public static double getCpuCoresWithFallbackConfigOption(Configuration config, ConfigOption<Double> fallbackOption) {
        double fallbackValue = config.getDouble(fallbackOption);
        return TaskExecutorProcessUtils.getCpuCoresWithFallback(config, fallbackValue).getValue().doubleValue();
    }

    public static CPUResource getCpuCoresWithFallback(Configuration config, double fallback) {
        double cpuCores = config.contains(TaskManagerOptions.CPU_CORES) ? config.getDouble(TaskManagerOptions.CPU_CORES) : (fallback > 0.0 ? fallback : (double)config.getInteger(TaskManagerOptions.NUM_TASK_SLOTS));
        if (cpuCores <= 0.0) {
            throw new IllegalConfigurationException(String.format("TaskExecutors need to be started with a positive number of CPU cores. Please configure %s accordingly.", TaskManagerOptions.CPU_CORES.key()));
        }
        return new CPUResource(cpuCores);
    }

    private static TaskExecutorProcessSpec createTaskExecutorProcessSpec(Configuration config, FlinkInternalMemory flinkInternalMemory, JvmMetaspaceAndOverhead jvmMetaspaceAndOverhead) {
        return new TaskExecutorProcessSpec(TaskExecutorProcessUtils.getCpuCores(config), flinkInternalMemory.frameworkHeap, flinkInternalMemory.frameworkOffHeap, flinkInternalMemory.taskHeap, flinkInternalMemory.taskOffHeap, flinkInternalMemory.network, flinkInternalMemory.managed, jvmMetaspaceAndOverhead.metaspace, jvmMetaspaceAndOverhead.overhead);
    }

    public static Configuration getConfigurationMapLegacyTaskManagerHeapSizeToConfigOption(Configuration configuration, ConfigOption<MemorySize> configOption) {
        if (configuration.contains(configOption)) {
            return configuration;
        }
        return TaskExecutorProcessUtils.getLegacyTaskManagerHeapMemoryIfExplicitlyConfigured(configuration).map(legacyHeapSize -> {
            Configuration copiedConfig = new Configuration(configuration);
            copiedConfig.set(configOption, legacyHeapSize);
            LOG.info("'{}' is not specified, use the configured deprecated task manager heap value ({}) for it.", (Object)configOption.key(), (Object)legacyHeapSize.toHumanReadableString());
            return copiedConfig;
        }).orElse(configuration);
    }

    private static Optional<MemorySize> getLegacyTaskManagerHeapMemoryIfExplicitlyConfigured(Configuration configuration) {
        String totalProcessEnv = System.getenv("FLINK_TM_HEAP");
        if (totalProcessEnv != null) {
            try {
                return Optional.of(MemorySize.parse((String)totalProcessEnv));
            }
            catch (Throwable t) {
                throw new IllegalConfigurationException("Cannot read total process memory size from environment variable value " + totalProcessEnv + ".", t);
            }
        }
        if (configuration.contains(TaskManagerOptions.TASK_MANAGER_HEAP_MEMORY)) {
            return Optional.of(TaskExecutorProcessUtils.getMemorySizeFromConfig(configuration, (ConfigOption<MemorySize>)TaskManagerOptions.TASK_MANAGER_HEAP_MEMORY));
        }
        if (configuration.contains(TaskManagerOptions.TASK_MANAGER_HEAP_MEMORY_MB)) {
            long legacyHeapMemoryMB = configuration.getInteger(TaskManagerOptions.TASK_MANAGER_HEAP_MEMORY_MB);
            if (legacyHeapMemoryMB < 0L) {
                throw new IllegalConfigurationException("Configured total process memory size (" + legacyHeapMemoryMB + "MB) must not be less than 0.");
            }
            return Optional.of(new MemorySize(legacyHeapMemoryMB << 20));
        }
        return Optional.empty();
    }

    private static class JvmMetaspaceAndOverhead {
        final MemorySize metaspace;
        final MemorySize overhead;

        JvmMetaspaceAndOverhead(MemorySize jvmMetaspace, MemorySize jvmOverhead) {
            this.metaspace = (MemorySize)Preconditions.checkNotNull((Object)jvmMetaspace);
            this.overhead = (MemorySize)Preconditions.checkNotNull((Object)jvmOverhead);
        }

        MemorySize getTotalJvmMetaspaceAndOverheadSize() {
            return this.metaspace.add(this.overhead);
        }
    }

    private static class FlinkInternalMemory {
        final MemorySize frameworkHeap;
        final MemorySize frameworkOffHeap;
        final MemorySize taskHeap;
        final MemorySize taskOffHeap;
        final MemorySize network;
        final MemorySize managed;

        FlinkInternalMemory(MemorySize frameworkHeap, MemorySize frameworkOffHeap, MemorySize taskHeap, MemorySize taskOffHeap, MemorySize network, MemorySize managed) {
            this.frameworkHeap = (MemorySize)Preconditions.checkNotNull((Object)frameworkHeap);
            this.frameworkOffHeap = (MemorySize)Preconditions.checkNotNull((Object)frameworkOffHeap);
            this.taskHeap = (MemorySize)Preconditions.checkNotNull((Object)taskHeap);
            this.taskOffHeap = (MemorySize)Preconditions.checkNotNull((Object)taskOffHeap);
            this.network = (MemorySize)Preconditions.checkNotNull((Object)network);
            this.managed = (MemorySize)Preconditions.checkNotNull((Object)managed);
        }

        MemorySize getTotalFlinkMemorySize() {
            return this.frameworkHeap.add(this.frameworkOffHeap).add(this.taskHeap).add(this.taskOffHeap).add(this.network).add(this.managed);
        }
    }

    private static class RangeFraction {
        final MemorySize minSize;
        final MemorySize maxSize;
        final double fraction;

        RangeFraction(MemorySize minSize, MemorySize maxSize, double fraction) {
            this.minSize = minSize;
            this.maxSize = maxSize;
            this.fraction = fraction;
            Preconditions.checkArgument((minSize.getBytes() <= maxSize.getBytes() ? 1 : 0) != 0, (Object)"min value must be less or equal to max value");
            Preconditions.checkArgument((fraction >= 0.0 && fraction < 1.0 ? 1 : 0) != 0, (Object)"fraction must be in range [0, 1)");
        }
    }
}

