package io.confluent.ksql.internal;

import com.google.common.collect.ImmutableMap;
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import io.confluent.ksql.engine.KsqlEngine;
import io.confluent.ksql.engine.QueryEventListener;
import io.confluent.ksql.metrics.MetricCollectors;
import java.io.Closeable;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.metrics.Gauge;
import org.apache.kafka.common.metrics.MeasurableStat;
import org.apache.kafka.common.metrics.MetricConfig;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.metrics.stats.Avg;
import org.apache.kafka.common.metrics.stats.Max;
import org.apache.kafka.common.metrics.stats.Min;
import org.apache.kafka.common.metrics.stats.Value;
import org.apache.kafka.streams.KafkaStreams;

/* loaded from: input_file:io/confluent/ksql/internal/KsqlEngineMetrics.class */
public class KsqlEngineMetrics implements Closeable {
    private static final String DEFAULT_METRIC_GROUP_PREFIX = "ksql-engine";
    private static final String METRIC_GROUP_POST_FIX = "-query-stats";
    private final List<Sensor> sensors;
    private final List<CountMetric> countMetrics;
    private final String metricGroupPrefix;
    private final String metricGroupName;
    private final Sensor messagesIn;
    private final Sensor totalMessagesIn;
    private final Sensor totalBytesIn;
    private final Sensor messagesOut;
    private final Sensor numIdleQueries;
    private final Sensor messageConsumptionByQuery;
    private final Sensor errorRate;
    private final String ksqlServiceIdLegacyPrefix;
    private final String ksqlServicePrefix;
    private final Map<String, String> customMetricsTags;
    private final Map<String, String> newCustomMetricsTags;
    private final Optional<KsqlMetricsExtension> metricsExtension;
    private final KsqlEngine ksqlEngine;
    private final Metrics metrics;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:io/confluent/ksql/internal/KsqlEngineMetrics$CountMetric.class */
    public static class CountMetric {
        private final Gauge<Long> count;
        private final MetricName metricName;

        CountMetric(MetricName metricName, Gauge<Long> gauge) {
            Objects.requireNonNull(metricName, "Metric name cannot be null.");
            Objects.requireNonNull(gauge, "Count gauge cannot be null.");
            this.metricName = metricName;
            this.count = gauge;
        }

        MetricName getMetricName() {
            return this.metricName;
        }

        public Gauge<Long> getCount() {
            return this.count;
        }
    }

    public KsqlEngineMetrics(String str, KsqlEngine ksqlEngine, Map<String, String> map, Optional<KsqlMetricsExtension> optional) {
        this(str.isEmpty() ? DEFAULT_METRIC_GROUP_PREFIX : str, ksqlEngine, MetricCollectors.getMetrics(), map, optional);
    }

    KsqlEngineMetrics(String str, KsqlEngine ksqlEngine, Metrics metrics, Map<String, String> map, Optional<KsqlMetricsExtension> optional) {
        this.ksqlEngine = ksqlEngine;
        this.ksqlServiceIdLegacyPrefix = "_confluent-ksql-" + ksqlEngine.getServiceId();
        this.ksqlServicePrefix = "_confluent-";
        this.sensors = new ArrayList();
        this.countMetrics = new ArrayList();
        this.metricGroupPrefix = (String) Objects.requireNonNull(str, "metricGroupPrefix");
        this.metricGroupName = str + METRIC_GROUP_POST_FIX;
        this.customMetricsTags = map;
        this.newCustomMetricsTags = ImmutableMap.builder().putAll(map).put("ksql_service_id", ksqlEngine.getServiceId()).build();
        this.metricsExtension = optional;
        this.metrics = metrics;
        configureLivenessIndicator();
        configureNumActiveQueries();
        configureNumPersistentQueries();
        this.messagesIn = configureMessagesIn();
        this.totalMessagesIn = configureTotalMessagesIn();
        this.totalBytesIn = configureTotalBytesIn();
        this.messagesOut = configureMessagesOut();
        this.numIdleQueries = configureIdleQueriesSensor();
        this.messageConsumptionByQuery = configureMessageConsumptionByQuerySensor();
        this.errorRate = configureErrorRate();
        Arrays.stream(KafkaStreams.State.values()).forEach(this::configureNumActiveQueriesForGivenState);
        configureCustomMetrics();
    }

    @Override // java.io.Closeable, java.lang.AutoCloseable
    public void close() {
        this.sensors.forEach(sensor -> {
            this.metrics.removeSensor(sensor.name());
        });
        this.countMetrics.forEach(countMetric -> {
            this.metrics.removeMetric(countMetric.getMetricName());
        });
    }

    public void updateMetrics() {
        recordMessagesConsumed(MetricCollectors.currentConsumptionRate());
        recordTotalMessagesConsumed(MetricCollectors.totalMessageConsumption());
        recordTotalBytesConsumed(MetricCollectors.totalBytesConsumption());
        recordMessagesProduced(MetricCollectors.currentProductionRate());
        recordMessageConsumptionByQueryStats(MetricCollectors.currentConsumptionRateByQuery());
        recordErrorRate(MetricCollectors.currentErrorRate());
    }

    @SuppressFBWarnings(value = {"EI_EXPOSE_REP"}, justification = "should be mutable")
    public Metrics getMetrics() {
        return this.metrics;
    }

    List<Sensor> registeredSensors() {
        return this.sensors;
    }

    public QueryEventListener getQueryEventListener() {
        return new QueryStateMetricsReportingListener(this.metrics, this.metricGroupPrefix.equals(DEFAULT_METRIC_GROUP_PREFIX) ? "" : this.metricGroupPrefix);
    }

    private void recordMessageConsumptionByQueryStats(Collection<Double> collection) {
        this.numIdleQueries.record(collection.stream().filter(d -> {
            return d.doubleValue() == 0.0d;
        }).count());
        Sensor sensor = this.messageConsumptionByQuery;
        sensor.getClass();
        collection.forEach((v1) -> {
            r1.record(v1);
        });
    }

    private void recordMessagesProduced(double d) {
        this.messagesOut.record(d);
    }

    private void recordMessagesConsumed(double d) {
        this.messagesIn.record(d);
    }

    private void recordTotalBytesConsumed(double d) {
        this.totalBytesIn.record(d);
    }

    private void recordTotalMessagesConsumed(double d) {
        this.totalMessagesIn.record(d);
    }

    private void recordErrorRate(double d) {
        this.errorRate.record(d);
    }

    private Sensor configureErrorRate() {
        return createSensor(KsqlMetric.of("error-rate", "The number of messages which were consumed but not processed. Messages may not be processed if, for instance, the message contents could not be deserialized due to an incompatible schema. Alternately, a consumed messages may not have been produced, hence being effectively dropped. Such messages would also be counted toward the error rate.", Value::new));
    }

    private Sensor configureMessagesOut() {
        return createSensor(KsqlMetric.of("messages-produced-per-sec", "The number of messages produced per second across all queries", Value::new));
    }

    private Sensor configureMessagesIn() {
        return createSensor(KsqlMetric.of("messages-consumed-per-sec", "The number of messages consumed per second across all queries", Value::new));
    }

    private Sensor configureTotalMessagesIn() {
        return createSensor(KsqlMetric.of("messages-consumed-total", "The total number of messages consumed across all queries", Value::new));
    }

    private Sensor configureTotalBytesIn() {
        return createSensor(KsqlMetric.of("bytes-consumed-total", "The total number of bytes consumed across all queries", Value::new));
    }

    private void configureNumActiveQueries() {
        createSensor(KsqlMetric.of("num-active-queries", "The current number of active queries running in this engine", () -> {
            return new MeasurableStat() { // from class: io.confluent.ksql.internal.KsqlEngineMetrics.1
                public double measure(MetricConfig metricConfig, long j) {
                    return KsqlEngineMetrics.this.ksqlEngine.numberOfLiveQueries();
                }

                public void record(MetricConfig metricConfig, double d, long j) {
                }
            };
        }));
    }

    private void configureNumPersistentQueries() {
        createSensor(KsqlMetric.of("num-persistent-queries", "The current number of persistent queries running in this engine", () -> {
            return new MeasurableStat() { // from class: io.confluent.ksql.internal.KsqlEngineMetrics.2
                public double measure(MetricConfig metricConfig, long j) {
                    return KsqlEngineMetrics.this.ksqlEngine.getPersistentQueries().size();
                }

                public void record(MetricConfig metricConfig, double d, long j) {
                }
            };
        }));
    }

    private Sensor configureIdleQueriesSensor() {
        return createSensor(KsqlMetric.of("num-idle-queries", "Number of inactive queries", Value::new));
    }

    private void configureLivenessIndicator() {
        createSensor(KsqlMetric.of("liveness-indicator", "A metric with constant value 1 indicating the server is up and emitting metrics", () -> {
            return new MeasurableStat() { // from class: io.confluent.ksql.internal.KsqlEngineMetrics.3
                public double measure(MetricConfig metricConfig, long j) {
                    return 1.0d;
                }

                public void record(MetricConfig metricConfig, double d, long j) {
                }
            };
        }));
    }

    private Sensor configureMessageConsumptionByQuerySensor() {
        Sensor createSensor = createSensor("message-consumption-by-query");
        configureMetric(createSensor, KsqlMetric.of("messages-consumed-max", "max msgs consumed by query", Max::new));
        configureMetric(createSensor, KsqlMetric.of("messages-consumed-min", "min msgs consumed by query", Min::new));
        configureMetric(createSensor, KsqlMetric.of("messages-consumed-avg", "mean msgs consumed by query", Avg::new));
        return createSensor;
    }

    private void configureMetric(Sensor sensor, KsqlMetric ksqlMetric) {
        sensor.add(this.metrics.metricName(ksqlMetric.name(), this.ksqlServiceIdLegacyPrefix + this.metricGroupName, ksqlMetric.description(), this.customMetricsTags), ksqlMetric.statSupplier().get());
        sensor.add(this.metrics.metricName(ksqlMetric.name(), this.ksqlServicePrefix + this.metricGroupName, ksqlMetric.description(), this.newCustomMetricsTags), ksqlMetric.statSupplier().get());
    }

    private Sensor createSensor(String str) {
        Sensor sensor = this.metrics.sensor(this.metricGroupName + "-" + str);
        this.sensors.add(sensor);
        return sensor;
    }

    private Sensor createSensor(KsqlMetric ksqlMetric) {
        Sensor createSensor = createSensor(ksqlMetric.name());
        configureMetric(createSensor, ksqlMetric);
        return createSensor;
    }

    private void configureGaugeForState(String str, String str2, Map<String, String> map, KafkaStreams.State state) {
        Gauge gauge = (metricConfig, j) -> {
            return Long.valueOf(this.ksqlEngine.getPersistentQueries().stream().filter(persistentQueryMetadata -> {
                return persistentQueryMetadata.getState().equals(state);
            }).count());
        };
        MetricName metricName = this.metrics.metricName(str, str2, String.format("Count of queries in %s state.", state.toString()), map);
        CountMetric countMetric = new CountMetric(metricName, gauge);
        this.metrics.addMetric(metricName, gauge);
        this.countMetrics.add(countMetric);
    }

    private void configureNumActiveQueriesForGivenState(KafkaStreams.State state) {
        String str = state + "-queries";
        configureGaugeForState(str, this.ksqlServiceIdLegacyPrefix + this.metricGroupName, this.customMetricsTags, state);
        configureGaugeForState(str, this.ksqlServicePrefix + this.metricGroupName, this.newCustomMetricsTags, state);
    }

    private void configureCustomMetrics() {
        if (this.metricsExtension.isPresent()) {
            this.metricsExtension.get().getCustomMetrics().forEach(this::createSensor);
        }
    }
}
