package io.confluent.ksql.internal;

import com.google.common.base.Ticker;
import io.confluent.ksql.engine.QueryEventListener;
import io.confluent.ksql.metastore.MetaStore;
import io.confluent.ksql.query.QueryError;
import io.confluent.ksql.query.QueryId;
import io.confluent.ksql.services.ServiceContext;
import io.confluent.ksql.util.QueryMetadata;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.metrics.MetricConfig;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.metrics.stats.CumulativeSum;
import org.apache.kafka.streams.KafkaStreams;

/* loaded from: input_file:io/confluent/ksql/internal/QueryStateMetricsReportingListener.class */
public class QueryStateMetricsReportingListener implements QueryEventListener {
    public static final String QUERY_RESTART_METRIC_NAME = "query-restart-total";
    public static final String QUERY_RESTART_METRIC_DESCRIPTION = "The total number of times that a query thread has failed and then been restarted.";
    public static final Ticker CURRENT_TIME_MILLIS_TICKER = new Ticker() { // from class: io.confluent.ksql.internal.QueryStateMetricsReportingListener.1
        public long read() {
            return System.currentTimeMillis();
        }
    };
    private final Metrics metrics;
    private final String metricsPrefix;
    private final Map<String, String> metricsTags;
    private final ConcurrentMap<QueryId, PerQueryListener> perQuery = new ConcurrentHashMap();
    private static final String NO_ERROR = "NO_ERROR";

    /* loaded from: input_file:io/confluent/ksql/internal/QueryStateMetricsReportingListener$PerQueryListener.class */
    private static class PerQueryListener {
        private final Metrics metrics;
        private final MetricName stateMetricName;
        private final MetricName errorMetricName;
        private final MetricName queryRestartMetricName;
        private final MetricName ksqlQueryStatusMetricName;
        private final CumulativeSum queryRestartSum;
        private final Ticker ticker;
        private volatile String state;
        private volatile String ksqlQueryState;
        private volatile String error;

        PerQueryListener(Metrics metrics, String str, String str2, Map<String, String> map) {
            this(metrics, str, str2, QueryStateMetricsReportingListener.CURRENT_TIME_MILLIS_TICKER, map);
        }

        PerQueryListener(Metrics metrics, String str, String str2, Ticker ticker, Map<String, String> map) {
            this.state = "-";
            this.ksqlQueryState = "-";
            this.error = QueryStateMetricsReportingListener.NO_ERROR;
            Objects.requireNonNull(str, "groupPrefix");
            Objects.requireNonNull(str2, "queryId");
            this.metrics = (Metrics) Objects.requireNonNull(metrics, "metrics cannot be null.");
            this.ticker = (Ticker) Objects.requireNonNull(ticker, "ticker");
            String str3 = "_confluent-ksql-" + str + (str2.toLowerCase().contains("transient") ? "transient_" : "query_") + str2;
            HashMap hashMap = new HashMap(map);
            hashMap.put("status", str3);
            this.stateMetricName = metrics.metricName("query-status", str + "ksql-queries", "The current Kafka Streams status of the given query.", hashMap);
            this.errorMetricName = metrics.metricName("error-status", str + "ksql-queries", "The current error status of the given query, if the state is in ERROR state", hashMap);
            HashMap hashMap2 = new HashMap(hashMap);
            hashMap2.put(MetricsTagUtils.KSQL_QUERY_ID_TAG, str2);
            this.queryRestartMetricName = metrics.metricName(QueryStateMetricsReportingListener.QUERY_RESTART_METRIC_NAME, str + "ksql-queries", QueryStateMetricsReportingListener.QUERY_RESTART_METRIC_DESCRIPTION, hashMap2);
            this.ksqlQueryStatusMetricName = metrics.metricName("ksql-query-status", str + "ksql-queries", "The current ksqlDB status of the given query.", hashMap);
            this.queryRestartSum = new CumulativeSum();
            this.metrics.addMetric(this.stateMetricName, (metricConfig, j) -> {
                return this.state;
            });
            this.metrics.addMetric(this.errorMetricName, (metricConfig2, j2) -> {
                return this.error;
            });
            this.metrics.addMetric(this.queryRestartMetricName, this.queryRestartSum);
            this.metrics.addMetric(this.ksqlQueryStatusMetricName, (metricConfig3, j3) -> {
                return this.ksqlQueryState;
            });
        }

        public void onChange(KafkaStreams.State state, KafkaStreams.State state2) {
            this.state = state.toString();
            if (state != KafkaStreams.State.ERROR) {
                this.error = QueryStateMetricsReportingListener.NO_ERROR;
            }
        }

        public void setKsqlQueryState(String str) {
            this.ksqlQueryState = str;
        }

        public void onError(QueryError queryError) {
            this.error = queryError.getType().name();
            this.queryRestartSum.record(new MetricConfig(), 1.0d, System.currentTimeMillis());
        }

        public void onDeregister() {
            this.metrics.removeMetric(this.stateMetricName);
            this.metrics.removeMetric(this.errorMetricName);
            this.metrics.removeMetric(this.queryRestartMetricName);
            this.metrics.removeMetric(this.ksqlQueryStatusMetricName);
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public QueryStateMetricsReportingListener(Metrics metrics, String str, Map<String, String> map) {
        this.metrics = (Metrics) Objects.requireNonNull(metrics, "metrics");
        this.metricsPrefix = (String) Objects.requireNonNull(str, "metricGroupPrefix");
        this.metricsTags = (Map) Objects.requireNonNull(map);
    }

    @Override // io.confluent.ksql.engine.QueryEventListener
    public void onCreate(ServiceContext serviceContext, MetaStore metaStore, QueryMetadata queryMetadata) {
        if (this.perQuery.containsKey(queryMetadata.getQueryId())) {
            return;
        }
        this.perQuery.put(queryMetadata.getQueryId(), new PerQueryListener(this.metrics, this.metricsPrefix, queryMetadata.getQueryId().toString(), this.metricsTags));
    }

    @Override // io.confluent.ksql.engine.QueryEventListener
    public void onStateChange(QueryMetadata queryMetadata, KafkaStreams.State state, KafkaStreams.State state2) {
        PerQueryListener perQueryListener = this.perQuery.get(queryMetadata.getQueryId());
        if (perQueryListener != null) {
            perQueryListener.onChange(state, state2);
            perQueryListener.setKsqlQueryState(queryMetadata.getQueryStatus().toString());
        }
    }

    @Override // io.confluent.ksql.engine.QueryEventListener
    public void onKsqlStateChange(QueryMetadata queryMetadata) {
        PerQueryListener perQueryListener = this.perQuery.get(queryMetadata.getQueryId());
        if (perQueryListener != null) {
            perQueryListener.setKsqlQueryState(queryMetadata.getQueryStatus().toString());
        }
    }

    @Override // io.confluent.ksql.engine.QueryEventListener
    public void onError(QueryMetadata queryMetadata, QueryError queryError) {
        PerQueryListener perQueryListener = this.perQuery.get(queryMetadata.getQueryId());
        if (perQueryListener != null) {
            perQueryListener.onError(queryError);
            perQueryListener.setKsqlQueryState(queryMetadata.getQueryStatus().toString());
        }
    }

    @Override // io.confluent.ksql.engine.QueryEventListener
    public void onDeregister(QueryMetadata queryMetadata) {
        this.perQuery.get(queryMetadata.getQueryId()).onDeregister();
        this.perQuery.remove(queryMetadata.getQueryId());
    }
}
