/*
 * Decompiled with CFR 0.152.
 */
package com.linkedin.kafka.cruisecontrol.monitor.sampling;

import com.linkedin.kafka.cruisecontrol.KafkaCruiseControlUtils;
import com.linkedin.kafka.cruisecontrol.common.KafkaCruiseControlThreadFactory;
import com.linkedin.kafka.cruisecontrol.common.MetadataClient;
import com.linkedin.kafka.cruisecontrol.config.BrokerCapacityConfigFileResolver;
import com.linkedin.kafka.cruisecontrol.config.BrokerCapacityConfigResolver;
import com.linkedin.kafka.cruisecontrol.config.KafkaCruiseControlConfig;
import com.linkedin.kafka.cruisecontrol.monitor.sampling.MetricFetcher;
import com.linkedin.kafka.cruisecontrol.monitor.sampling.MetricSampler;
import com.linkedin.kafka.cruisecontrol.monitor.sampling.MetricSamplerPartitionAssignor;
import com.linkedin.kafka.cruisecontrol.monitor.sampling.SamplingFetcher;
import com.linkedin.kafka.cruisecontrol.monitor.sampling.aggregator.KafkaBrokerMetricSampleAggregator;
import com.linkedin.kafka.cruisecontrol.monitor.sampling.aggregator.KafkaPartitionMetricSampleAggregator;
import com.linkedin.kafka.cruisecontrol.monitor.sampling.aggregator.KafkaReplicaMetricSampleAggregator;
import com.yammer.metrics.core.Meter;
import com.yammer.metrics.core.Timer;
import io.confluent.databalancer.metrics.DataBalancerMetricsRegistry;
import java.util.Collections;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.utils.Time;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class MetricFetcherManager {
    public static final String BROKER_CAPACITY_CONFIG_RESOLVER_OBJECT_CONFIG = "broker.capacity.config.resolver.object";
    public static final String DEFAULT_BROKER_CAPACITY_CONFIG_RESOLVER_OBJECT_CONFIG = BrokerCapacityConfigFileResolver.class.getCanonicalName();
    private static final Logger LOG = LoggerFactory.getLogger(MetricFetcherManager.class);
    static final int SUPPORTED_NUM_METRIC_FETCHER = 1;
    static final int SHUTDOWN_TIMEOUT_MS = 5000;
    private final Time time;
    private final KafkaReplicaMetricSampleAggregator replicaMetricSampleAggregator;
    private final KafkaPartitionMetricSampleAggregator partitionMetricSampleAggregator;
    private final KafkaBrokerMetricSampleAggregator brokerMetricSampleAggregator;
    private final MetadataClient metadataClient;
    private MetricSampler metricSampler;
    private MetricSamplerPartitionAssignor partitionAssignor;
    private ExecutorService samplingExecutor;
    private Timer samplingFetcherTimer;
    private Meter samplingFetcherFailureRate;
    private final KafkaCruiseControlConfig config;
    private final DataBalancerMetricsRegistry metricRegistry;
    private final BrokerCapacityConfigResolver brokerCapacityConfigResolver;

    public MetricFetcherManager(KafkaCruiseControlConfig config, KafkaReplicaMetricSampleAggregator replicaMetricSampleAggregator, KafkaPartitionMetricSampleAggregator partitionMetricSampleAggregator, KafkaBrokerMetricSampleAggregator brokerMetricSampleAggregator, MetadataClient metadataClient, Time time, DataBalancerMetricsRegistry metricRegistry, BrokerCapacityConfigResolver brokerCapacityConfigResolver, MetricSampler sampler) {
        this.config = config;
        this.time = time;
        this.replicaMetricSampleAggregator = replicaMetricSampleAggregator;
        this.partitionMetricSampleAggregator = partitionMetricSampleAggregator;
        this.brokerMetricSampleAggregator = brokerMetricSampleAggregator;
        this.metadataClient = metadataClient;
        this.metricRegistry = metricRegistry;
        this.brokerCapacityConfigResolver = brokerCapacityConfigResolver;
        this.metricSampler = sampler;
    }

    public void start() {
        this.samplingExecutor = Executors.newFixedThreadPool(1, new KafkaCruiseControlThreadFactory("MetricFetcher", true, LOG));
        this.partitionAssignor = this.config.getConfiguredInstance("metric.sampler.partition.assignor.class", MetricSamplerPartitionAssignor.class);
        this.partitionAssignor.configure(this.config.mergedConfigValues());
        this.samplingFetcherTimer = this.metricRegistry.newTimer(MetricFetcherManager.class, "partition-samples-fetcher-timer");
        this.samplingFetcherFailureRate = this.metricRegistry.newMeter(MetricFetcherManager.class, "partition-samples-fetcher-failure-rate", "partition-samples-fetch-failures", TimeUnit.SECONDS);
        this.metricSampler = this.metricSampler == null ? this.config.getConfiguredInstance("metric.sampler.class", MetricSampler.class, Collections.singletonMap(BROKER_CAPACITY_CONFIG_RESOLVER_OBJECT_CONFIG, this.brokerCapacityConfigResolver)) : this.metricSampler;
    }

    public void shutdown() {
        LOG.info("Shutting down metric fetcher manager.");
        KafkaCruiseControlUtils.executeSilently(this.samplingExecutor, KafkaCruiseControlUtils.getExecutorShutdownConsumerWithTimeout(5000L));
        KafkaCruiseControlUtils.closeSilently(this.metricSampler);
        LOG.info("Metric fetcher manager shutdown completed.");
    }

    public boolean fetchPartitionMetricSamples(long startMs, long endMs, long timeoutMs) {
        LOG.debug("Kicking off partition metric sampling for time range [{}, {}], duration {} ms with timeout {} ms.", new Object[]{startMs, endMs, endMs - startMs, timeoutMs});
        Set<PartitionInfo> partitionAssignment = this.partitionAssignor.assignPartitions(this.metadataClient.cluster());
        SamplingFetcher samplingFetcher = new SamplingFetcher(this.metricSampler, this.metadataClient, this.replicaMetricSampleAggregator, this.partitionMetricSampleAggregator, this.brokerMetricSampleAggregator, partitionAssignment, startMs, endMs, true, this.samplingFetcherTimer, this.samplingFetcherFailureRate);
        return this.fetchSamples(samplingFetcher, timeoutMs);
    }

    private boolean fetchSamples(MetricFetcher metricFetcher, long timeoutMs) {
        boolean hasSamplingError = false;
        long samplingActionStartMs = this.time.milliseconds();
        long deadlineMs = samplingActionStartMs + timeoutMs;
        Future<Boolean> errorFuture = this.samplingExecutor.submit(metricFetcher);
        try {
            hasSamplingError = errorFuture.get(deadlineMs - this.time.milliseconds(), TimeUnit.MILLISECONDS);
        }
        catch (InterruptedException e) {
            LOG.warn("Sampling scheduler thread is interrupted when waiting for sampling to finish.", (Throwable)e);
        }
        catch (ExecutionException e) {
            LOG.error("Sampling scheduler received Execution exception when waiting for sampling to finish.", (Throwable)e);
        }
        catch (TimeoutException e) {
            LOG.error("Sampling scheduler received Timeout exception when waiting for sampling to finish.", (Throwable)e);
        }
        catch (Exception e) {
            LOG.error("Sampling scheduler received Unknown exception when waiting for sampling to finish.", (Throwable)e);
        }
        long samplingTime = this.time.milliseconds() - samplingActionStartMs;
        LOG.debug("Finished sampling in {} ms.", (Object)samplingTime);
        return hasSamplingError;
    }
}

