package com.linkedin.kafka.cruisecontrol.monitor.sampling;

import com.linkedin.cruisecontrol.metricdef.MetricDef;
import com.linkedin.kafka.cruisecontrol.common.MetadataClient;
import com.linkedin.kafka.cruisecontrol.exception.MetricSamplingException;
import com.linkedin.kafka.cruisecontrol.model.ModelParameters;
import com.linkedin.kafka.cruisecontrol.monitor.metricdefinition.KafkaMetricDef;
import com.linkedin.kafka.cruisecontrol.monitor.sampling.MetricSampler;
import com.linkedin.kafka.cruisecontrol.monitor.sampling.aggregator.KafkaBrokerMetricSampleAggregator;
import com.linkedin.kafka.cruisecontrol.monitor.sampling.aggregator.KafkaPartitionMetricSampleAggregator;
import com.linkedin.kafka.cruisecontrol.monitor.sampling.holder.BrokerMetricSample;
import com.linkedin.kafka.cruisecontrol.monitor.sampling.holder.PartitionMetricSample;
import com.yammer.metrics.core.Meter;
import com.yammer.metrics.core.Timer;
import com.yammer.metrics.core.TimerContext;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Set;
import org.apache.kafka.common.TopicPartition;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:com/linkedin/kafka/cruisecontrol/monitor/sampling/SamplingFetcher.class */
public class SamplingFetcher extends MetricFetcher {
    private static final Logger LOG = LoggerFactory.getLogger(SamplingFetcher.class);
    private final MetricSampler metricSampler;
    private final MetadataClient metadataClient;
    private final KafkaPartitionMetricSampleAggregator partitionMetricSampleAggregator;
    private final KafkaBrokerMetricSampleAggregator brokerMetricSampleAggregator;
    private final SampleStore sampleStore;
    private final Set<TopicPartition> assignedPartitions;
    private final long startTimeMs;
    private final long endTimeMs;
    private final boolean leaderValidation;
    private final boolean useLinearRegressionModel;
    private final Timer fetchTimer;
    private final Meter fetchFailureRate;
    private final MetricDef metricDef;
    private final long timeout;

    /* JADX INFO: Access modifiers changed from: package-private */
    public SamplingFetcher(MetricSampler metricSampler, MetadataClient metadataClient, KafkaPartitionMetricSampleAggregator kafkaPartitionMetricSampleAggregator, KafkaBrokerMetricSampleAggregator kafkaBrokerMetricSampleAggregator, SampleStore sampleStore, Set<TopicPartition> set, long j, long j2, boolean z, boolean z2, MetricDef metricDef, Timer timer, Meter meter) {
        this.metricSampler = metricSampler;
        this.metadataClient = metadataClient;
        this.partitionMetricSampleAggregator = kafkaPartitionMetricSampleAggregator;
        this.brokerMetricSampleAggregator = kafkaBrokerMetricSampleAggregator;
        this.sampleStore = sampleStore;
        this.assignedPartitions = set;
        this.metricDef = metricDef;
        this.startTimeMs = j;
        this.endTimeMs = j2;
        this.leaderValidation = z;
        this.useLinearRegressionModel = z2;
        this.fetchTimer = timer;
        this.fetchFailureRate = meter;
        this.timeout = System.currentTimeMillis() + ((j2 - j) / 2);
    }

    @Override // com.linkedin.kafka.cruisecontrol.monitor.sampling.MetricFetcher
    protected void fetchMetricsForAssignedPartitions() throws MetricSamplingException {
        TimerContext time = this.fetchTimer.time();
        try {
            try {
                this.sampleStore.storeSamples(fetchSamples());
                time.stop();
            } catch (Exception e) {
                this.fetchFailureRate.mark();
                throw e;
            }
        } catch (Throwable th) {
            time.stop();
            throw th;
        }
    }

    private MetricSampler.Samples fetchSamples() throws MetricSamplingException {
        MetricSampler.Samples samples = this.metricSampler.getSamples(this.metadataClient.cluster(), this.assignedPartitions, this.startTimeMs, this.endTimeMs, MetricSampler.SamplingMode.ALL, this.metricDef, this.timeout);
        if (samples == null) {
            samples = MetricSampler.EMPTY_SAMPLES;
        }
        addPartitionSamples(samples.partitionMetricSamples());
        addBrokerMetricSamples(samples.brokerMetricSamples());
        ModelParameters.addMetricObservation(samples.brokerMetricSamples());
        return samples;
    }

    private void addPartitionSamples(Set<PartitionMetricSample> set) {
        HashSet hashSet = new HashSet(this.assignedPartitions.size());
        if (set == null) {
            LOG.warn("Failed to collect partition metric samples for {} assigned partitions", Integer.valueOf(this.assignedPartitions.size()));
            return;
        }
        int i = 0;
        Iterator<PartitionMetricSample> it = set.iterator();
        while (it.hasNext()) {
            PartitionMetricSample next = it.next();
            TopicPartition tp = next.entity().tp();
            if (this.assignedPartitions.contains(tp)) {
                if (this.useLinearRegressionModel && ModelParameters.trainingCompleted()) {
                    next.record(KafkaMetricDef.commonMetricDef().metricInfo(KafkaMetricDef.CPU_USAGE.name()), SamplingUtils.estimateLeaderCpuUtilUsingLinearRegressionModel(next));
                }
                next.close(this.endTimeMs);
                if (this.partitionMetricSampleAggregator.addSample(next, this.leaderValidation)) {
                    LOG.trace("Enqueued partition metric sample {}", next);
                } else {
                    it.remove();
                    i++;
                    LOG.trace("Failed to add partition metric sample {}", next);
                }
                hashSet.add(tp);
            } else {
                LOG.warn("Collected partition metric sample for partition {} which is not an assigned partition. The metric sample will be ignored.", tp);
            }
        }
        LOG.debug("Collected {} ({} discarded) partition metric samples for {} partitions. Total partition assigned: {}.", new Object[]{Integer.valueOf(set.size()), Integer.valueOf(i), Integer.valueOf(hashSet.size()), Integer.valueOf(this.assignedPartitions.size())});
    }

    private void addBrokerMetricSamples(Set<BrokerMetricSample> set) {
        HashSet hashSet = new HashSet();
        if (set == null) {
            LOG.warn("Failed to collect broker metrics samples.");
            return;
        }
        int i = 0;
        Iterator<BrokerMetricSample> it = set.iterator();
        while (it.hasNext()) {
            BrokerMetricSample next = it.next();
            next.close(this.endTimeMs);
            if (this.brokerMetricSampleAggregator.addSample(next)) {
                LOG.trace("Enqueued broker metric sample {}", next);
            } else {
                it.remove();
                i++;
                LOG.trace("Failed to add broker metric sample {}", next);
            }
            hashSet.add(Integer.valueOf(next.brokerId()));
        }
        LOG.debug("Collected {} ({} discarded) broker metric samples for {} brokers.", new Object[]{Integer.valueOf(set.size()), Integer.valueOf(i), Integer.valueOf(hashSet.size())});
    }
}
