/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.connect.runtime.distributed.workermanager;

import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import org.apache.kafka.common.Confluent;
import org.apache.kafka.common.metrics.MeasurableStat;
import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.metrics.stats.CumulativeSum;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.ThreadUtils;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.connect.runtime.ConnectMetrics;
import org.apache.kafka.connect.runtime.ConnectMetricsRegistry;
import org.apache.kafka.connect.runtime.distributed.ConnectClusterMetrics;
import org.apache.kafka.connect.runtime.distributed.DistributedConfig;
import org.apache.kafka.connect.runtime.distributed.WorkerCoordinator;
import org.apache.kafka.connect.storage.MetricsStore;
import org.apache.kafka.connect.storage.MetricsStoreFactory;
import org.apache.kafka.connect.util.ConnectorTaskId;
import org.slf4j.Logger;

@Confluent
public class WorkerResourceManager {
    private final ResourceManagerMetrics resourceManagerMetrics;
    private MetricsStore metricsStore;
    private Logger log;
    private Time time;
    private final Double cpuThresholdPercentage = 80.0;
    private final Double memoryThresholdPercentage = 80.0;
    private final long totalAvailableHeapMemory;
    private final long rebalanceInterval = 1800000L;
    final int consecutiveImbalanceThreshold = 6;
    private Map<String, Map<ConnectorTaskId, Double>> taskLoad;
    private Map<String, Double> workersCpuLoad;
    private Map<String, Double> workersMemoryLoad;
    private WorkerCoordinator coordinator;
    private ScheduledExecutorService scheduler;
    private Map<String, List<ConnectorTaskId>> latestTaskAssignment;
    Map<String, Integer> numConsecutiveCpuLoadImbalances = new HashMap<String, Integer>();
    Map<String, Integer> numConsecutiveMemoryLoadImbalances = new HashMap<String, Integer>();

    public WorkerResourceManager(LogContext logContext, WorkerCoordinator coordinator, DistributedConfig config, Time time, ConnectMetrics metrics) {
        this.log = logContext.logger(WorkerResourceManager.class);
        this.metricsStore = MetricsStoreFactory.createMetricsStore(config);
        this.scheduler = Executors.newSingleThreadScheduledExecutor();
        this.workersCpuLoad = new HashMap<String, Double>();
        this.workersMemoryLoad = new HashMap<String, Double>();
        this.taskLoad = new HashMap<String, Map<ConnectorTaskId, Double>>();
        this.coordinator = coordinator;
        this.totalAvailableHeapMemory = Runtime.getRuntime().maxMemory();
        this.time = time;
        this.resourceManagerMetrics = new ResourceManagerMetrics(metrics);
    }

    WorkerResourceManager(LogContext logContext, WorkerCoordinator coordinator, DistributedConfig config, ScheduledExecutorService executor, long totalHeapMemory, Time time, ConnectMetrics metrics) {
        this.log = logContext.logger(WorkerResourceManager.class);
        this.metricsStore = MetricsStoreFactory.createMetricsStore(config);
        this.scheduler = executor;
        this.workersCpuLoad = new HashMap<String, Double>();
        this.workersMemoryLoad = new HashMap<String, Double>();
        this.taskLoad = new HashMap<String, Map<ConnectorTaskId, Double>>();
        this.coordinator = coordinator;
        this.totalAvailableHeapMemory = totalHeapMemory;
        this.time = time;
        this.resourceManagerMetrics = new ResourceManagerMetrics(metrics);
    }

    public void start() throws Exception {
        this.log.info("Starting Worker Resource Manager");
        try {
            this.metricsStore.start();
        }
        catch (Exception e) {
            this.log.error("Failed to start the metrics store, resource manager won't start", (Throwable)e);
            throw new RuntimeException(e);
        }
        this.getScheduler().scheduleWithFixedDelay(this::startImbalanceDetection, 0L, 5L, TimeUnit.MINUTES);
    }

    public void stop() throws Exception {
        ScheduledExecutorService scheduler = this.getScheduler();
        ThreadUtils.shutdownExecutorServiceQuietly((ExecutorService)scheduler, (long)60L, (TimeUnit)TimeUnit.SECONDS);
        this.metricsStore.stop();
        this.resourceManagerMetrics.close();
        this.log.info("Worker Resource Manager stopped");
    }

    public synchronized void setLatestTaskAssignment(Map<String, List<ConnectorTaskId>> latestTaskAssignment) {
        this.latestTaskAssignment = new ConcurrentHashMap<String, List<ConnectorTaskId>>(latestTaskAssignment);
        this.log.debug("Latest Worker Resource Manager Task Assignment: {}", latestTaskAssignment);
    }

    protected synchronized Map<String, List<ConnectorTaskId>> getLatestTaskAssignment() {
        return this.latestTaskAssignment;
    }

    public synchronized void startImbalanceDetection() {
        this.log.debug("Starting Imbalance Detection");
        this.workersCpuLoad = this.metricsStore.getWorkersCPULoad();
        this.workersMemoryLoad = this.metricsStore.getWorkersMemoryLoad();
        if (this.workersCpuLoad.size() == 1) {
            this.log.info("Only one worker present in the cluster, skipping imbalance detection");
            return;
        }
        for (String workerId : this.workersCpuLoad.keySet()) {
            boolean memoryExceeds;
            if (this.findWorkerWithWorkerIdPrefix(workerId) == null) {
                this.numConsecutiveCpuLoadImbalances.remove(workerId);
                this.numConsecutiveMemoryLoadImbalances.remove(workerId);
                continue;
            }
            double cpuLoad = this.workersCpuLoad.getOrDefault(workerId, 0.0);
            double memoryLoad = this.workersMemoryLoad.getOrDefault(workerId, 0.0) / (double)this.getTotalAvailableHeapMemory() * 100.0;
            boolean cpuExceeds = cpuLoad > this.cpuThresholdPercentage;
            boolean bl = memoryExceeds = memoryLoad > this.memoryThresholdPercentage;
            if (cpuExceeds) {
                this.numConsecutiveCpuLoadImbalances.put(workerId, this.numConsecutiveCpuLoadImbalances.getOrDefault(workerId, 0) + 1);
                this.log.debug("high cpu  {} detected on worker {} for consecutive {} times", new Object[]{cpuLoad, workerId, this.numConsecutiveCpuLoadImbalances.get(workerId)});
            } else {
                this.numConsecutiveCpuLoadImbalances.put(workerId, 0);
            }
            if (memoryExceeds) {
                this.numConsecutiveMemoryLoadImbalances.put(workerId, this.numConsecutiveMemoryLoadImbalances.getOrDefault(workerId, 0) + 1);
                this.log.debug("high memory {} detected on worker {} for consecutive {} times", new Object[]{memoryLoad, workerId, this.numConsecutiveMemoryLoadImbalances.get(workerId)});
            } else {
                this.numConsecutiveMemoryLoadImbalances.put(workerId, 0);
            }
            if (this.numConsecutiveCpuLoadImbalances.get(workerId) < 6 && this.numConsecutiveMemoryLoadImbalances.get(workerId) < 6 || this.taskCountOnWorker(workerId) <= 1) continue;
            this.log.info("Worker {} has exceeded thresholds - CPU: {}%, Memory: {}%, triggering re-balance", new Object[]{workerId, cpuLoad, memoryLoad});
            this.triggerRebalance();
            break;
        }
    }

    public synchronized boolean IsAnyWorkerLoadHigh() {
        for (String workerId : this.workersCpuLoad.keySet()) {
            double cpuLoad = this.workersCpuLoad.getOrDefault(workerId, 0.0);
            double memoryLoad = this.workersMemoryLoad.getOrDefault(workerId, 0.0) / (double)this.getTotalAvailableHeapMemory() * 100.0;
            if (!(cpuLoad > this.cpuThresholdPercentage) && !(memoryLoad > this.memoryThresholdPercentage)) continue;
            return true;
        }
        return false;
    }

    String findWorkerWithWorkerIdPrefix(String workerId) {
        if (this.latestTaskAssignment == null || this.latestTaskAssignment.isEmpty()) {
            return null;
        }
        String workerIdWithPrefix = this.latestTaskAssignment.keySet().stream().filter(key -> key.startsWith(workerId)).findFirst().orElse(null);
        return workerIdWithPrefix;
    }

    protected int taskCountOnWorker(String workerId) {
        this.taskLoad = this.filterRedundantTasks(this.metricsStore.getTasksLoad());
        if (this.taskLoad == null || this.taskLoad.isEmpty()) {
            return 0;
        }
        Map<ConnectorTaskId, Double> workerTaskLoad = this.taskLoad.get(workerId);
        if (workerTaskLoad != null) {
            return workerTaskLoad.size();
        }
        return 0;
    }

    public void stopImbalanceDetection() throws Exception {
        this.stop();
    }

    public void triggerRebalance() {
        this.numConsecutiveCpuLoadImbalances.clear();
        this.numConsecutiveMemoryLoadImbalances.clear();
        this.coordinator.requestRejoin("Trigger re-balance due to worker imbalance");
        this.resourceManagerMetrics.reblanaceTriggered();
    }

    public synchronized ConnectClusterMetrics snapshot() {
        List<ConnectClusterMetrics.WorkerResourceLoad> workerLoad = this.calculateWorkersLoad();
        this.taskLoad = this.filterRedundantTasks(this.metricsStore.getTasksLoad());
        this.log.debug("Metrics store snapshot Task Load:  {}, worker Load: {}", this.taskLoad, workerLoad);
        if (this.taskLoad != null && !this.taskLoad.isEmpty() && workerLoad != null && !workerLoad.isEmpty()) {
            List<ConnectClusterMetrics.TaskLoad> load = this.taskLoad.entrySet().stream().flatMap(entry -> ((Map)entry.getValue()).entrySet().stream().map(taskEntry -> new ConnectClusterMetrics.TaskLoad((ConnectorTaskId)taskEntry.getKey(), (Double)taskEntry.getValue()))).collect(Collectors.toList());
            return new ConnectClusterMetrics(workerLoad, load);
        }
        return new ConnectClusterMetrics(null, null);
    }

    protected List<ConnectClusterMetrics.WorkerResourceLoad> calculateWorkersLoad() {
        if (this.workersMemoryLoad.isEmpty() || this.workersCpuLoad.isEmpty()) {
            this.workersMemoryLoad = new ConcurrentHashMap<String, Double>(this.metricsStore.getWorkersMemoryLoad());
            this.workersCpuLoad = new ConcurrentHashMap<String, Double>(this.metricsStore.getWorkersCPULoad());
        }
        if (this.workersMemoryLoad.isEmpty() || this.workersCpuLoad.isEmpty()) {
            return null;
        }
        Map workersAvailableHeapPercentage = this.workersMemoryLoad.entrySet().stream().collect(HashMap::new, (m, e) -> m.put((String)e.getKey(), ((double)this.getTotalAvailableHeapMemory() - (Double)e.getValue()) / (double)this.getTotalAvailableHeapMemory() * 100.0), HashMap::putAll);
        Map workersAvailableCpuPercentage = this.workersCpuLoad.entrySet().stream().collect(HashMap::new, (m, e) -> m.put((String)e.getKey(), 100.0 - (Double)e.getValue()), HashMap::putAll);
        double totalMemoryAvailability = workersAvailableHeapPercentage.values().stream().mapToDouble(Double::doubleValue).sum();
        Map workersMemoryAvailabilityInCluster = workersAvailableHeapPercentage.entrySet().stream().collect(HashMap::new, (m, e) -> m.put((String)e.getKey(), totalMemoryAvailability == 0.0 ? 0.0 : (Double)e.getValue() / totalMemoryAvailability * 100.0), HashMap::putAll);
        double totalCpuAvailability = workersAvailableCpuPercentage.values().stream().mapToDouble(Double::doubleValue).sum();
        Map workersCPUAvaibiltyInCluster = workersAvailableCpuPercentage.entrySet().stream().collect(HashMap::new, (m, e) -> m.put((String)e.getKey(), totalCpuAvailability == 0.0 ? 0.0 : (Double)e.getValue() / totalCpuAvailability * 100.0), HashMap::putAll);
        HashMap workersEffectiveResourceAvailability = new HashMap();
        workersMemoryAvailabilityInCluster.forEach((workerId, memory) -> {
            double cpu = workersCPUAvaibiltyInCluster.getOrDefault(workerId, 0.0);
            workersEffectiveResourceAvailability.put(workerId, Math.min(memory, cpu));
        });
        return workersEffectiveResourceAvailability.entrySet().stream().map(entry -> new ConnectClusterMetrics.WorkerResourceLoad((String)entry.getKey(), 100.0 - (Double)entry.getValue())).collect(Collectors.toList());
    }

    protected synchronized Map<String, Map<ConnectorTaskId, Double>> filterRedundantTasks(Map<String, Map<ConnectorTaskId, Double>> taskLoad) {
        if (this.latestTaskAssignment == null) {
            return taskLoad;
        }
        HashMap<String, Map<ConnectorTaskId, Double>> filteredTaskLoad = new HashMap<String, Map<ConnectorTaskId, Double>>();
        taskLoad.forEach((workerId, tasks) -> {
            String workerIdWithPrefix = this.findWorkerWithWorkerIdPrefix((String)workerId);
            if (workerIdWithPrefix != null) {
                Map<ConnectorTaskId, Double> filteredTasks = tasks.entrySet().stream().filter(taskEntry -> this.latestTaskAssignment.get(workerIdWithPrefix).contains(taskEntry.getKey())).collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
                filteredTaskLoad.put((String)workerId, filteredTasks);
            }
        });
        this.log.debug("Metrics store Task Load:  {} Filtered Task Load: {}", taskLoad, filteredTaskLoad);
        return filteredTaskLoad;
    }

    protected ScheduledExecutorService getScheduler() {
        return this.scheduler;
    }

    protected long getTotalAvailableHeapMemory() {
        return this.totalAvailableHeapMemory;
    }

    protected ResourceManagerMetrics resourceManagerMetrics() {
        return this.resourceManagerMetrics;
    }

    class ResourceManagerMetrics {
        private final ConnectMetrics.MetricGroup metricGroup;
        private final Sensor proactiveRebalanceCompletedCounts;

        public ResourceManagerMetrics(ConnectMetrics connectMetrics) {
            ConnectMetricsRegistry registry = connectMetrics.registry();
            this.metricGroup = connectMetrics.group(registry.workerResourceManagerGroupName(), new String[0]);
            this.proactiveRebalanceCompletedCounts = this.metricGroup.sensor("total-proactive-rebalances");
            this.proactiveRebalanceCompletedCounts.add(this.metricGroup.metricName(registry.proactiveRebalanceCount), (MeasurableStat)new CumulativeSum());
        }

        void close() {
            this.metricGroup.close();
        }

        void reblanaceTriggered() {
            this.proactiveRebalanceCompletedCounts.record(1.0);
        }

        protected ConnectMetrics.MetricGroup metricGroup() {
            return this.metricGroup;
        }
    }
}

