package io.confluent.ksql.util;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Throwables;
import com.google.common.collect.EvictingQueue;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import io.confluent.ksql.internal.QueryStateListener;
import io.confluent.ksql.name.SourceName;
import io.confluent.ksql.query.KafkaStreamsBuilder;
import io.confluent.ksql.query.QueryError;
import io.confluent.ksql.query.QueryErrorClassifier;
import io.confluent.ksql.query.QueryId;
import io.confluent.ksql.schema.ksql.LogicalSchema;
import io.confluent.ksql.util.KsqlConstants;
import java.lang.Thread;
import java.time.Duration;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Queue;
import java.util.Set;
import java.util.function.Consumer;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.LagInfo;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.errors.StreamsException;
import org.apache.kafka.streams.state.StreamsMetadata;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/confluent/ksql/util/QueryMetadata.class */
public abstract class QueryMetadata {
    private static final Logger LOG = LoggerFactory.getLogger(QueryMetadata.class);
    private final String statementString;
    private final String executionPlan;
    private final String queryApplicationId;
    private final Topology topology;
    private final KafkaStreamsBuilder kafkaStreamsBuilder;
    private final Map<String, Object> streamsProperties;
    private final Map<String, Object> overriddenProperties;
    protected final Consumer<QueryMetadata> closeCallback;
    private final Set<SourceName> sourceNames;
    private final LogicalSchema logicalSchema;
    private final Long closeTimeout;
    private final QueryId queryId;
    private final QueryErrorClassifier errorClassifier;
    private final Queue<QueryError> queryErrors;
    private Optional<QueryStateListener> queryStateListener;
    private boolean everStarted;
    protected boolean closed;
    private Thread.UncaughtExceptionHandler uncaughtExceptionHandler;
    private KafkaStreams kafkaStreams;
    private Consumer<Boolean> onStop;
    private boolean initialized;

    /* JADX INFO: Access modifiers changed from: package-private */
    @VisibleForTesting
    public QueryMetadata(String str, LogicalSchema logicalSchema, Set<SourceName> set, String str2, String str3, Topology topology, KafkaStreamsBuilder kafkaStreamsBuilder, Map<String, Object> map, Map<String, Object> map2, Consumer<QueryMetadata> consumer, long j, QueryId queryId, QueryErrorClassifier queryErrorClassifier, int i) {
        this.queryStateListener = Optional.empty();
        this.everStarted = false;
        this.closed = false;
        this.uncaughtExceptionHandler = this::uncaughtHandler;
        this.onStop = bool -> {
        };
        this.initialized = false;
        this.statementString = (String) Objects.requireNonNull(str, "statementString");
        this.executionPlan = (String) Objects.requireNonNull(str2, "executionPlan");
        this.queryApplicationId = (String) Objects.requireNonNull(str3, "queryApplicationId");
        this.topology = (Topology) Objects.requireNonNull(topology, "kafkaTopicClient");
        this.kafkaStreamsBuilder = (KafkaStreamsBuilder) Objects.requireNonNull(kafkaStreamsBuilder, "kafkaStreamsBuilder");
        this.streamsProperties = ImmutableMap.copyOf((Map) Objects.requireNonNull(map, "streamsPropeties"));
        this.overriddenProperties = ImmutableMap.copyOf((Map) Objects.requireNonNull(map2, "overriddenProperties"));
        this.closeCallback = (Consumer) Objects.requireNonNull(consumer, "closeCallback");
        this.sourceNames = (Set) Objects.requireNonNull(set, "sourceNames");
        this.logicalSchema = (LogicalSchema) Objects.requireNonNull(logicalSchema, "logicalSchema");
        this.closeTimeout = Long.valueOf(j);
        this.queryId = (QueryId) Objects.requireNonNull(queryId, "queryId");
        this.errorClassifier = (QueryErrorClassifier) Objects.requireNonNull(queryErrorClassifier, "errorClassifier");
        this.queryErrors = EvictingQueue.create(i);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public QueryMetadata(QueryMetadata queryMetadata, Consumer<QueryMetadata> consumer) {
        this.queryStateListener = Optional.empty();
        this.everStarted = false;
        this.closed = false;
        this.uncaughtExceptionHandler = this::uncaughtHandler;
        this.onStop = bool -> {
        };
        this.initialized = false;
        this.statementString = queryMetadata.statementString;
        this.kafkaStreams = queryMetadata.kafkaStreams;
        this.executionPlan = queryMetadata.executionPlan;
        this.queryApplicationId = queryMetadata.queryApplicationId;
        this.topology = queryMetadata.topology;
        this.kafkaStreamsBuilder = queryMetadata.kafkaStreamsBuilder;
        this.streamsProperties = queryMetadata.streamsProperties;
        this.overriddenProperties = queryMetadata.overriddenProperties;
        this.sourceNames = queryMetadata.sourceNames;
        this.logicalSchema = queryMetadata.logicalSchema;
        this.closeCallback = (Consumer) Objects.requireNonNull(consumer, "closeCallback");
        this.closeTimeout = queryMetadata.closeTimeout;
        this.queryId = queryMetadata.queryId;
        this.errorClassifier = queryMetadata.errorClassifier;
        this.uncaughtExceptionHandler = queryMetadata.uncaughtExceptionHandler;
        this.queryStateListener = queryMetadata.queryStateListener;
        this.everStarted = queryMetadata.everStarted;
        this.queryErrors = queryMetadata.queryErrors;
    }

    public void initialize() {
        this.kafkaStreams = this.kafkaStreamsBuilder.build(this.topology, this.streamsProperties);
        this.kafkaStreams.setUncaughtExceptionHandler(this::uncaughtHandler);
        this.initialized = true;
    }

    public void setQueryStateListener(QueryStateListener queryStateListener) {
        this.queryStateListener = Optional.of(queryStateListener);
        this.kafkaStreams.setStateListener(queryStateListener);
        queryStateListener.onChange(this.kafkaStreams.state(), this.kafkaStreams.state());
    }

    public void onStop(Consumer<Boolean> consumer) {
        this.onStop = consumer;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void uncaughtHandler(Thread thread, Throwable th) {
        QueryError.Type type = QueryError.Type.UNKNOWN;
        try {
            try {
                type = this.errorClassifier.classify(th);
                QueryError queryError = new QueryError(System.currentTimeMillis(), Throwables.getStackTraceAsString(th), type);
                this.queryStateListener.ifPresent(queryStateListener -> {
                    queryStateListener.onError(queryError);
                });
                this.queryErrors.add(queryError);
                LOG.error("Unhandled exception caught in streams thread {}. ({})", new Object[]{thread.getName(), type, th});
            } catch (Exception e) {
                LOG.error("Error classifying unhandled exception", e);
                throw e;
            }
        } catch (Throwable th2) {
            QueryError queryError2 = new QueryError(System.currentTimeMillis(), Throwables.getStackTraceAsString(th), type);
            this.queryStateListener.ifPresent(queryStateListener2 -> {
                queryStateListener2.onError(queryError2);
            });
            this.queryErrors.add(queryError2);
            LOG.error("Unhandled exception caught in streams thread {}. ({})", new Object[]{thread.getName(), type, th});
            throw th2;
        }
    }

    public Map<String, Object> getOverriddenProperties() {
        return this.overriddenProperties;
    }

    public String getStatementString() {
        return this.statementString;
    }

    public void setUncaughtExceptionHandler(Thread.UncaughtExceptionHandler uncaughtExceptionHandler) {
        this.uncaughtExceptionHandler = uncaughtExceptionHandler;
        this.kafkaStreams.setUncaughtExceptionHandler(uncaughtExceptionHandler);
    }

    public KafkaStreams.State getState() {
        return this.kafkaStreams.state();
    }

    public boolean isError() {
        return getState() == KafkaStreams.State.ERROR;
    }

    public String getExecutionPlan() {
        return this.executionPlan;
    }

    public String getQueryApplicationId() {
        return this.queryApplicationId;
    }

    public Topology getTopology() {
        return this.topology;
    }

    public Map<String, Map<Integer, LagInfo>> getAllLocalStorePartitionLags() {
        try {
            return this.kafkaStreams.allLocalStorePartitionLags();
        } catch (IllegalStateException | StreamsException e) {
            LOG.error(e.getMessage());
            return ImmutableMap.of();
        }
    }

    public Collection<StreamsMetadata> getAllMetadata() {
        try {
            return ImmutableList.copyOf(this.kafkaStreams.allMetadata());
        } catch (IllegalStateException e) {
            LOG.error(e.getMessage());
            return ImmutableList.of();
        }
    }

    public Map<String, Object> getStreamsProperties() {
        return this.streamsProperties;
    }

    public LogicalSchema getLogicalSchema() {
        return this.logicalSchema;
    }

    public Set<SourceName> getSourceNames() {
        return this.sourceNames;
    }

    public boolean hasEverBeenStarted() {
        return this.everStarted;
    }

    public QueryId getQueryId() {
        return this.queryId;
    }

    public KsqlConstants.KsqlQueryType getQueryType() {
        return KsqlConstants.KsqlQueryType.PERSISTENT;
    }

    public String getTopologyDescription() {
        return this.topology.describe().toString();
    }

    public List<QueryError> getQueryErrors() {
        return ImmutableList.copyOf(this.queryErrors);
    }

    public long uptime() {
        return ((Long) this.queryStateListener.map((v0) -> {
            return v0.uptime();
        }).orElse(0L)).longValue();
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public boolean isClosed() {
        return this.closed;
    }

    public KafkaStreams getKafkaStreams() {
        return this.kafkaStreams;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void resetKafkaStreams(KafkaStreams kafkaStreams) {
        this.kafkaStreams = kafkaStreams;
        setUncaughtExceptionHandler(this.uncaughtExceptionHandler);
        this.queryStateListener.ifPresent(this::setQueryStateListener);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void closeKafkaStreams() {
        if (this.initialized) {
            this.kafkaStreams.close(Duration.ofMillis(this.closeTimeout.longValue()));
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public KafkaStreams buildKafkaStreams() {
        return this.kafkaStreamsBuilder.build(this.topology, this.streamsProperties);
    }

    public synchronized void stop() {
        doClose(false);
    }

    public void close() {
        doClose(true);
    }

    private void doClose(boolean z) {
        this.closed = true;
        closeKafkaStreams();
        if (z) {
            this.kafkaStreams.cleanUp();
        }
        this.queryStateListener.ifPresent((v0) -> {
            v0.close();
        });
        if (z) {
            this.closeCallback.accept(this);
        }
        this.onStop.accept(Boolean.valueOf(z));
    }

    public void start() {
        if (!this.initialized) {
            throw new KsqlException(String.format("Failed to initialize query %s before starting it", this.queryApplicationId));
        }
        LOG.info("Starting query with application id: {}", this.queryApplicationId);
        this.everStarted = true;
        this.kafkaStreams.start();
    }

    public void clearErrors() {
        this.queryErrors.clear();
    }
}
