package com.linkedin.kafka.cruisecontrol.monitor.sampling;

import com.linkedin.cruisecontrol.metricdef.MetricDef;
import com.linkedin.kafka.cruisecontrol.KafkaCruiseControlUtils;
import com.linkedin.kafka.cruisecontrol.common.KafkaCruiseControlThreadFactory;
import com.linkedin.kafka.cruisecontrol.common.MetadataClient;
import com.linkedin.kafka.cruisecontrol.config.BrokerCapacityConfigFileResolver;
import com.linkedin.kafka.cruisecontrol.config.BrokerCapacityConfigResolver;
import com.linkedin.kafka.cruisecontrol.config.KafkaCruiseControlConfig;
import com.linkedin.kafka.cruisecontrol.monitor.sampling.aggregator.KafkaBrokerMetricSampleAggregator;
import com.linkedin.kafka.cruisecontrol.monitor.sampling.aggregator.KafkaPartitionMetricSampleAggregator;
import com.yammer.metrics.core.Meter;
import com.yammer.metrics.core.Timer;
import io.confluent.databalancer.metrics.DataBalancerMetricsRegistry;
import java.util.Collections;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.apache.kafka.common.utils.Time;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/linkedin/kafka/cruisecontrol/monitor/sampling/MetricFetcherManager.class */
public class MetricFetcherManager {
    public static final String BROKER_CAPACITY_CONFIG_RESOLVER_OBJECT_CONFIG = "broker.capacity.config.resolver.object";
    public static final String DEFAULT_BROKER_CAPACITY_CONFIG_RESOLVER_OBJECT_CONFIG = BrokerCapacityConfigFileResolver.class.getCanonicalName();
    private static final Logger LOG = LoggerFactory.getLogger(MetricFetcherManager.class);
    static final int SUPPORTED_NUM_METRIC_FETCHER = 1;
    static final int SHUTDOWN_TIMEOUT_MS = 5000;
    private final Time time;
    private final KafkaPartitionMetricSampleAggregator partitionMetricSampleAggregator;
    private final KafkaBrokerMetricSampleAggregator brokerMetricSampleAggregator;
    private final MetadataClient metadataClient;
    private MetricSampler metricSampler;
    private MetricSamplerPartitionAssignor partitionAssignor;
    private ExecutorService samplingExecutor;
    private boolean useLinearRegressionModel;
    private final MetricDef metricDef;
    private Timer samplingFetcherTimer;
    private Meter samplingFetcherFailureRate;
    private final KafkaCruiseControlConfig config;
    private final DataBalancerMetricsRegistry metricRegistry;
    private final BrokerCapacityConfigResolver brokerCapacityConfigResolver;

    public MetricFetcherManager(KafkaCruiseControlConfig kafkaCruiseControlConfig, KafkaPartitionMetricSampleAggregator kafkaPartitionMetricSampleAggregator, KafkaBrokerMetricSampleAggregator kafkaBrokerMetricSampleAggregator, MetadataClient metadataClient, MetricDef metricDef, Time time, DataBalancerMetricsRegistry dataBalancerMetricsRegistry, BrokerCapacityConfigResolver brokerCapacityConfigResolver, MetricSampler metricSampler) {
        this.config = kafkaCruiseControlConfig;
        this.time = time;
        this.partitionMetricSampleAggregator = kafkaPartitionMetricSampleAggregator;
        this.brokerMetricSampleAggregator = kafkaBrokerMetricSampleAggregator;
        this.metadataClient = metadataClient;
        this.metricDef = metricDef;
        this.metricRegistry = dataBalancerMetricsRegistry;
        this.brokerCapacityConfigResolver = brokerCapacityConfigResolver;
        this.metricSampler = metricSampler;
    }

    public void start() {
        this.samplingExecutor = Executors.newFixedThreadPool(1, new KafkaCruiseControlThreadFactory("MetricFetcher", true, LOG));
        this.partitionAssignor = (MetricSamplerPartitionAssignor) this.config.getConfiguredInstance(KafkaCruiseControlConfig.METRIC_SAMPLER_PARTITION_ASSIGNOR_CLASS_CONFIG, MetricSamplerPartitionAssignor.class);
        this.partitionAssignor.configure(this.config.mergedConfigValues());
        this.useLinearRegressionModel = this.config.getBoolean(KafkaCruiseControlConfig.USE_LINEAR_REGRESSION_MODEL_CONFIG).booleanValue();
        this.samplingFetcherTimer = this.metricRegistry.newTimer(MetricFetcherManager.class, "partition-samples-fetcher-timer");
        this.samplingFetcherFailureRate = this.metricRegistry.newMeter(MetricFetcherManager.class, "partition-samples-fetcher-failure-rate", "partition-samples-fetch-failures", TimeUnit.SECONDS);
        this.metricSampler = this.metricSampler == null ? (MetricSampler) this.config.getConfiguredInstance(KafkaCruiseControlConfig.METRIC_SAMPLER_CLASS_CONFIG, MetricSampler.class, Collections.singletonMap(BROKER_CAPACITY_CONFIG_RESOLVER_OBJECT_CONFIG, this.brokerCapacityConfigResolver)) : this.metricSampler;
    }

    public void shutdown() {
        LOG.info("Shutting down metric fetcher manager.");
        KafkaCruiseControlUtils.executeSilently(this.samplingExecutor, KafkaCruiseControlUtils.getExecutorShutdownConsumerWithTimeout(KafkaCruiseControlUtils.RECONNECT_BACKOFF_MAX_MS_CONFIG));
        KafkaCruiseControlUtils.closeSilently(this.metricSampler);
        LOG.info("Metric fetcher manager shutdown completed.");
    }

    public boolean fetchPartitionMetricSamples(long j, long j2, long j3, SampleStore sampleStore) {
        LOG.debug("Kicking off partition metric sampling for time range [{}, {}], duration {} ms with timeout {} ms.", new Object[]{Long.valueOf(j), Long.valueOf(j2), Long.valueOf(j2 - j), Long.valueOf(j3)});
        return fetchSamples(new SamplingFetcher(this.metricSampler, this.metadataClient, this.partitionMetricSampleAggregator, this.brokerMetricSampleAggregator, sampleStore, this.partitionAssignor.assignPartitions(this.metadataClient.cluster()), j, j2, true, this.useLinearRegressionModel, this.metricDef, this.samplingFetcherTimer, this.samplingFetcherFailureRate), j3);
    }

    private boolean fetchSamples(MetricFetcher metricFetcher, long j) {
        boolean z = false;
        long milliseconds = this.time.milliseconds();
        try {
            z = ((Boolean) this.samplingExecutor.submit(metricFetcher).get((milliseconds + j) - this.time.milliseconds(), TimeUnit.MILLISECONDS)).booleanValue();
        } catch (InterruptedException e) {
            LOG.warn("Sampling scheduler thread is interrupted when waiting for sampling to finish.", e);
        } catch (ExecutionException e2) {
            LOG.error("Sampling scheduler received Execution exception when waiting for sampling to finish.", e2);
        } catch (TimeoutException e3) {
            LOG.error("Sampling scheduler received Timeout exception when waiting for sampling to finish.", e3);
        } catch (Exception e4) {
            LOG.error("Sampling scheduler received Unknown exception when waiting for sampling to finish.", e4);
        }
        LOG.debug("Finished sampling in {} ms.", Long.valueOf(this.time.milliseconds() - milliseconds));
        return z;
    }
}
