/*
 * Decompiled with CFR 0.152.
 */
package com.linkedin.kafka.cruisecontrol.monitor.task;

import com.linkedin.kafka.cruisecontrol.KafkaCruiseControlUtils;
import com.linkedin.kafka.cruisecontrol.common.KafkaCruiseControlThreadFactory;
import com.linkedin.kafka.cruisecontrol.common.MetadataClient;
import com.linkedin.kafka.cruisecontrol.config.BrokerCapacityConfigResolver;
import com.linkedin.kafka.cruisecontrol.config.KafkaCruiseControlConfig;
import com.linkedin.kafka.cruisecontrol.monitor.sampling.MetricFetcherManager;
import com.linkedin.kafka.cruisecontrol.monitor.sampling.aggregator.KafkaBrokerMetricSampleAggregator;
import com.linkedin.kafka.cruisecontrol.monitor.sampling.aggregator.KafkaPartitionMetricSampleAggregator;
import com.linkedin.kafka.cruisecontrol.monitor.sampling.aggregator.KafkaReplicaMetricSampleAggregator;
import com.linkedin.kafka.cruisecontrol.monitor.task.SamplingTask;
import io.confluent.databalancer.metrics.DataBalancerMetricsRegistry;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.kafka.common.utils.Time;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class LoadMonitorTaskRunner {
    private static final Logger LOG = LoggerFactory.getLogger(LoadMonitorTaskRunner.class);
    private final Time time;
    private final MetricFetcherManager metricFetcherManager;
    private final KafkaReplicaMetricSampleAggregator replicaMetricSampleAggregator;
    private final KafkaPartitionMetricSampleAggregator partitionMetricSampleAggregator;
    private final KafkaBrokerMetricSampleAggregator brokerMetricSampleAggregator;
    private final MetadataClient metadataClient;
    private ScheduledExecutorService samplingScheduler;
    private long samplingIntervalMs;
    private final KafkaCruiseControlConfig config;
    private AtomicReference<LoadMonitorTaskRunnerState> state;
    private volatile boolean awaitingPauseSampling;
    private volatile String reasonOfLatestPauseOrResume;

    public LoadMonitorTaskRunner(KafkaCruiseControlConfig config, KafkaReplicaMetricSampleAggregator replicaMetricSampleAggregator, KafkaPartitionMetricSampleAggregator partitionMetricSampleAggregator, KafkaBrokerMetricSampleAggregator brokerMetricSampleAggregator, MetadataClient metadataClient, Time time, DataBalancerMetricsRegistry metricRegistry, BrokerCapacityConfigResolver brokerCapacityConfigResolver) {
        this(config, new MetricFetcherManager(config, replicaMetricSampleAggregator, partitionMetricSampleAggregator, brokerMetricSampleAggregator, metadataClient, time, metricRegistry, brokerCapacityConfigResolver, null), replicaMetricSampleAggregator, partitionMetricSampleAggregator, brokerMetricSampleAggregator, metadataClient, time);
    }

    LoadMonitorTaskRunner(KafkaCruiseControlConfig config, MetricFetcherManager metricFetcherManager, KafkaReplicaMetricSampleAggregator replicaMetricSampleAggregator, KafkaPartitionMetricSampleAggregator partitionMetricSampleAggregator, KafkaBrokerMetricSampleAggregator brokerMetricSampleAggregator, MetadataClient metadataClient, Time time) {
        this.config = config;
        this.time = time;
        this.metricFetcherManager = metricFetcherManager;
        this.replicaMetricSampleAggregator = replicaMetricSampleAggregator;
        this.partitionMetricSampleAggregator = partitionMetricSampleAggregator;
        this.brokerMetricSampleAggregator = brokerMetricSampleAggregator;
        this.metadataClient = metadataClient;
        this.awaitingPauseSampling = false;
        this.reasonOfLatestPauseOrResume = null;
    }

    public LoadMonitorTaskRunnerState state() {
        return this.state.get();
    }

    public void start() {
        this.metricFetcherManager.start();
        this.samplingScheduler = Executors.newScheduledThreadPool(2, new KafkaCruiseControlThreadFactory("SamplingScheduler", true, LOG));
        this.samplingIntervalMs = this.config.getLong("metric.sampling.interval.ms");
        this.state = new AtomicReference<LoadMonitorTaskRunnerState>(LoadMonitorTaskRunnerState.NOT_STARTED);
        if (!this.state.compareAndSet(LoadMonitorTaskRunnerState.NOT_STARTED, LoadMonitorTaskRunnerState.RUNNING)) {
            throw new IllegalStateException("Cannot start the task runner because the load monitor is in " + (Object)((Object)this.state.get()) + " state.");
        }
        this.samplingScheduler.scheduleAtFixedRate(new SamplingTask(this.samplingIntervalMs, this.metadataClient, this, this.metricFetcherManager, this.time), 0L, this.samplingIntervalMs, TimeUnit.MILLISECONDS);
    }

    public void shutdown() {
        LOG.info("Shutting down load monitor task runner.");
        KafkaCruiseControlUtils.executeSilently(this.samplingScheduler, KafkaCruiseControlUtils.getExecutorShutdownConsumerWithTimeout(1000L));
        KafkaCruiseControlUtils.executeSilently(this.metricFetcherManager, MetricFetcherManager::shutdown);
        LOG.info("Load monitor task runner shutdown completed.");
    }

    public synchronized void pauseSampling(String reason) {
        if (this.state.get() != LoadMonitorTaskRunnerState.PAUSED && !this.state.compareAndSet(LoadMonitorTaskRunnerState.RUNNING, LoadMonitorTaskRunnerState.PAUSED)) {
            this.awaitingPauseSampling = true;
            throw new IllegalStateException("Cannot pause the load monitor because it is in " + (Object)((Object)this.state.get()) + " state.");
        }
        this.awaitingPauseSampling = false;
        this.reasonOfLatestPauseOrResume = reason;
    }

    public synchronized void resumeSampling(String reason) {
        if (this.state.get() != LoadMonitorTaskRunnerState.RUNNING && !this.state.compareAndSet(LoadMonitorTaskRunnerState.PAUSED, LoadMonitorTaskRunnerState.RUNNING)) {
            throw new IllegalStateException("Cannot resume the load monitor because it is in " + (Object)((Object)this.state.get()) + " state");
        }
        this.reasonOfLatestPauseOrResume = reason;
    }

    public String reasonOfLatestPauseOrResume() {
        return this.reasonOfLatestPauseOrResume;
    }

    public boolean awaitingPauseSampling() {
        return this.awaitingPauseSampling;
    }

    boolean compareAndSetState(LoadMonitorTaskRunnerState expectedState, LoadMonitorTaskRunnerState newState) {
        return this.state.compareAndSet(expectedState, newState);
    }

    void setState(LoadMonitorTaskRunnerState newState) {
        this.state.set(newState);
    }

    public static enum LoadMonitorTaskRunnerState {
        NOT_STARTED,
        RUNNING,
        PAUSED,
        SAMPLING;

    }
}

