package io.confluent.ksql.internal;

import com.google.common.base.Ticker;
import io.confluent.ksql.engine.QueryEventListener;
import io.confluent.ksql.metastore.MetaStore;
import io.confluent.ksql.query.QueryError;
import io.confluent.ksql.query.QueryId;
import io.confluent.ksql.services.ServiceContext;
import io.confluent.ksql.util.QueryMetadata;
import java.util.Collections;
import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.streams.KafkaStreams;

/* loaded from: input_file:io/confluent/ksql/internal/QueryStateMetricsReportingListener.class */
public class QueryStateMetricsReportingListener implements QueryEventListener {
    public static final Ticker CURRENT_TIME_MILLIS_TICKER = new Ticker() { // from class: io.confluent.ksql.internal.QueryStateMetricsReportingListener.1
        public long read() {
            return System.currentTimeMillis();
        }
    };
    private final Metrics metrics;
    private final String metricsPrefix;
    private final ConcurrentMap<QueryId, PerQueryListener> perQuery = new ConcurrentHashMap();
    private static final String NO_ERROR = "NO_ERROR";

    /* loaded from: input_file:io/confluent/ksql/internal/QueryStateMetricsReportingListener$PerQueryListener.class */
    private static class PerQueryListener {
        private final Metrics metrics;
        private final MetricName stateMetricName;
        private final MetricName errorMetricName;
        private final Ticker ticker;
        private volatile String state;
        private volatile String error;

        PerQueryListener(Metrics metrics, String str, String str2) {
            this(metrics, str, str2, QueryStateMetricsReportingListener.CURRENT_TIME_MILLIS_TICKER);
        }

        PerQueryListener(Metrics metrics, String str, String str2, Ticker ticker) {
            this.state = "-";
            this.error = QueryStateMetricsReportingListener.NO_ERROR;
            Objects.requireNonNull(str, "groupPrefix");
            Objects.requireNonNull(str2, "queryApplicationId");
            this.metrics = (Metrics) Objects.requireNonNull(metrics, "metrics cannot be null.");
            this.ticker = (Ticker) Objects.requireNonNull(ticker, "ticker");
            this.stateMetricName = metrics.metricName("query-status", str + "ksql-queries", "The current status of the given query.", Collections.singletonMap("status", str2));
            this.errorMetricName = metrics.metricName("error-status", str + "ksql-queries", "The current error status of the given query, if the state is in ERROR state", Collections.singletonMap("status", str2));
            this.metrics.addMetric(this.stateMetricName, (metricConfig, j) -> {
                return this.state;
            });
            this.metrics.addMetric(this.errorMetricName, (metricConfig2, j2) -> {
                return this.error;
            });
        }

        public void onChange(KafkaStreams.State state, KafkaStreams.State state2) {
            this.state = state.toString();
            if (state != KafkaStreams.State.ERROR) {
                this.error = QueryStateMetricsReportingListener.NO_ERROR;
            }
        }

        public void onError(QueryError queryError) {
            this.error = queryError.getType().name();
        }

        public void onDeregister() {
            this.metrics.removeMetric(this.stateMetricName);
            this.metrics.removeMetric(this.errorMetricName);
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public QueryStateMetricsReportingListener(Metrics metrics, String str) {
        this.metrics = (Metrics) Objects.requireNonNull(metrics, "metrics");
        this.metricsPrefix = (String) Objects.requireNonNull(str, "metricGroupPrefix");
    }

    @Override // io.confluent.ksql.engine.QueryEventListener
    public void onCreate(ServiceContext serviceContext, MetaStore metaStore, QueryMetadata queryMetadata) {
        if (this.perQuery.containsKey(queryMetadata.getQueryId())) {
            return;
        }
        this.perQuery.put(queryMetadata.getQueryId(), new PerQueryListener(this.metrics, this.metricsPrefix, queryMetadata.getQueryApplicationId()));
    }

    @Override // io.confluent.ksql.engine.QueryEventListener
    public void onStateChange(QueryMetadata queryMetadata, KafkaStreams.State state, KafkaStreams.State state2) {
        PerQueryListener perQueryListener = this.perQuery.get(queryMetadata.getQueryId());
        if (perQueryListener != null) {
            perQueryListener.onChange(state, state2);
        }
    }

    @Override // io.confluent.ksql.engine.QueryEventListener
    public void onError(QueryMetadata queryMetadata, QueryError queryError) {
        PerQueryListener perQueryListener = this.perQuery.get(queryMetadata.getQueryId());
        if (perQueryListener != null) {
            perQueryListener.onError(queryError);
        }
    }

    @Override // io.confluent.ksql.engine.QueryEventListener
    public void onDeregister(QueryMetadata queryMetadata) {
        this.perQuery.get(queryMetadata.getQueryId()).onDeregister();
        this.perQuery.remove(queryMetadata.getQueryId());
    }
}
