package io.confluent.ksql.util;

import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableMap;
import io.confluent.ksql.query.KafkaStreamsBuilder;
import io.confluent.ksql.query.QueryError;
import io.confluent.ksql.query.QueryErrorClassifier;
import io.confluent.ksql.query.QueryId;
import io.confluent.ksql.util.QueryMetadataImpl;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.stream.Collectors;
import org.apache.kafka.common.KafkaFuture;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.errors.StreamsException;
import org.apache.kafka.streams.errors.StreamsUncaughtExceptionHandler;
import org.apache.kafka.streams.processor.TaskId;
import org.apache.kafka.streams.processor.internals.namedtopology.KafkaStreamsNamedTopologyWrapper;
import org.apache.kafka.streams.processor.internals.namedtopology.NamedTopology;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/confluent/ksql/util/SharedKafkaStreamsRuntimeImpl.class */
public class SharedKafkaStreamsRuntimeImpl extends SharedKafkaStreamsRuntime {
    private final Logger log;
    private final long shutdownTimeout;
    private final QueryErrorClassifier errorClassifier;
    private final int maxQueryErrorsQueueSize;
    private final List<KafkaFuture<Void>> topolgogiesToAdd;

    public SharedKafkaStreamsRuntimeImpl(KafkaStreamsBuilder kafkaStreamsBuilder, QueryErrorClassifier queryErrorClassifier, int i, long j, Map<String, Object> map) {
        super(kafkaStreamsBuilder, map);
        this.log = LoggerFactory.getLogger(SharedKafkaStreamsRuntimeImpl.class);
        this.errorClassifier = queryErrorClassifier;
        this.maxQueryErrorsQueueSize = i;
        this.shutdownTimeout = j;
        setupAndStartKafkaStreams(this.kafkaStreams);
        this.topolgogiesToAdd = new ArrayList();
    }

    @Override // io.confluent.ksql.util.SharedKafkaStreamsRuntime
    public void register(BinPackedPersistentQueryMetadataImpl binPackedPersistentQueryMetadataImpl) {
        QueryId queryId = binPackedPersistentQueryMetadataImpl.getQueryId();
        this.collocatedQueries.put(queryId, binPackedPersistentQueryMetadataImpl);
        this.log.info("Registered query: {}  in {} \nRuntime {} is executing these queries: {}", new Object[]{queryId, getApplicationId(), getApplicationId(), this.collocatedQueries.keySet().stream().map((v0) -> {
            return v0.toString();
        }).collect(Collectors.joining(", "))});
    }

    private void setupAndStartKafkaStreams(KafkaStreams kafkaStreams) {
        kafkaStreams.setUncaughtExceptionHandler(this::uncaughtHandler);
        kafkaStreams.setStateListener(stateListener());
        kafkaStreams.start();
    }

    public KafkaStreams.StateListener stateListener() {
        return (state, state2) -> {
            Iterator<BinPackedPersistentQueryMetadataImpl> it = this.collocatedQueries.values().iterator();
            while (it.hasNext()) {
                it.next().onStateChange(state, state2);
            }
        };
    }

    public StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse uncaughtHandler(Throwable th) {
        QueryError.Type type = QueryError.Type.UNKNOWN;
        try {
            try {
                type = this.errorClassifier.classify(th);
                if (th.getCause() != null && type == QueryError.Type.UNKNOWN) {
                    type = this.errorClassifier.classify(th.getCause());
                }
                QueryError queryError = new QueryError(System.currentTimeMillis(), Throwables.getStackTraceAsString(th), type);
                BinPackedPersistentQueryMetadataImpl parseException = parseException(th);
                if (parseException != null) {
                    parseException.setQueryError(queryError);
                    this.log.error(String.format("Unhandled query exception caught in streams thread %s for query %s. (%s)", Thread.currentThread().getName(), parseException.getQueryId(), type), th);
                } else {
                    Iterator<BinPackedPersistentQueryMetadataImpl> it = this.collocatedQueries.values().iterator();
                    while (it.hasNext()) {
                        it.next().setQueryError(queryError);
                    }
                    this.log.error(String.format("Unhandled runtime exception caught in streams thread %s. (%s)", Thread.currentThread().getName(), type), th);
                }
            } catch (Exception e) {
                this.log.error("Error classifying unhandled exception", e);
                QueryError queryError2 = new QueryError(System.currentTimeMillis(), Throwables.getStackTraceAsString(th), type);
                BinPackedPersistentQueryMetadataImpl parseException2 = parseException(th);
                if (parseException2 != null) {
                    parseException2.setQueryError(queryError2);
                    this.log.error(String.format("Unhandled query exception caught in streams thread %s for query %s. (%s)", Thread.currentThread().getName(), parseException2.getQueryId(), type), th);
                } else {
                    Iterator<BinPackedPersistentQueryMetadataImpl> it2 = this.collocatedQueries.values().iterator();
                    while (it2.hasNext()) {
                        it2.next().setQueryError(queryError2);
                    }
                    this.log.error(String.format("Unhandled runtime exception caught in streams thread %s. (%s)", Thread.currentThread().getName(), type), th);
                }
            }
            return StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse.REPLACE_THREAD;
        } catch (Throwable th2) {
            QueryError queryError3 = new QueryError(System.currentTimeMillis(), Throwables.getStackTraceAsString(th), type);
            BinPackedPersistentQueryMetadataImpl parseException3 = parseException(th);
            if (parseException3 != null) {
                parseException3.setQueryError(queryError3);
                this.log.error(String.format("Unhandled query exception caught in streams thread %s for query %s. (%s)", Thread.currentThread().getName(), parseException3.getQueryId(), type), th);
            } else {
                Iterator<BinPackedPersistentQueryMetadataImpl> it3 = this.collocatedQueries.values().iterator();
                while (it3.hasNext()) {
                    it3.next().setQueryError(queryError3);
                }
                this.log.error(String.format("Unhandled runtime exception caught in streams thread %s. (%s)", Thread.currentThread().getName(), type), th);
            }
            throw th2;
        }
    }

    private BinPackedPersistentQueryMetadataImpl parseException(Throwable th) {
        TaskId taskId = ((th instanceof StreamsException) && ((StreamsException) th).taskId().isPresent()) ? (TaskId) ((StreamsException) th).taskId().get() : null;
        QueryId queryId = (taskId == null || taskId.topologyName() == null) ? null : new QueryId(taskId.topologyName());
        if (taskId != null && taskId.topologyName() == null) {
            this.log.error("Unhandled exception originated from a task {} without an associated topology name (queryId).", taskId);
        } else if (queryId != null && !this.collocatedQueries.containsKey(queryId)) {
            this.log.error("Unhandled exception originated from a task {} with an unrecognized topology name (queryId) {}.", taskId, queryId);
        }
        if (queryId == null || !this.collocatedQueries.containsKey(queryId)) {
            return null;
        }
        return this.collocatedQueries.get(queryId);
    }

    @Override // io.confluent.ksql.util.SharedKafkaStreamsRuntime
    public QueryMetadataImpl.TimeBoundedQueue getNewQueryErrorQueue() {
        return new QueryMetadataImpl.TimeBoundedQueue(Duration.ofHours(1L), this.maxQueryErrorsQueueSize);
    }

    @Override // io.confluent.ksql.util.SharedKafkaStreamsRuntime
    public void stop(QueryId queryId, boolean z) {
        this.log.info("Attempting to stop query: {} in runtime {} with isCreateOrReplace={}", new Object[]{queryId, getApplicationId(), Boolean.valueOf(z)});
        if (this.kafkaStreams.getTopologyByName(queryId.toString()).isPresent() != this.collocatedQueries.containsKey(queryId)) {
            this.log.error("Non SandBoxed queries should not be registered and never started.");
        }
        if (this.kafkaStreams.getTopologyByName(queryId.toString()).isPresent()) {
            if (!this.kafkaStreams.state().isRunningOrRebalancing()) {
                throw new IllegalStateException("Streams in not running but is in state " + this.kafkaStreams.state());
            }
            try {
                Iterator<KafkaFuture<Void>> it = this.topolgogiesToAdd.iterator();
                while (it.hasNext()) {
                    it.next().get();
                }
                this.topolgogiesToAdd.clear();
                this.kafkaStreams.removeNamedTopology(queryId.toString(), !z).all().get();
                if (!z) {
                    this.kafkaStreams.cleanUpNamedTopology(queryId.toString());
                }
            } catch (InterruptedException | ExecutionException e) {
                throw new IllegalStateException(String.format("Encountered an error when trying to stop query %s in runtime: %s", queryId, getApplicationId()), e.getCause() == null ? e : e.getCause());
            }
        }
        if (!z) {
            this.collocatedQueries.remove(queryId);
        }
        this.log.info("Query {} was stopped successfully", queryId);
    }

    @Override // io.confluent.ksql.util.SharedKafkaStreamsRuntime
    public synchronized void close() {
        this.kafkaStreams.close();
        this.kafkaStreams.cleanUp();
    }

    @Override // io.confluent.ksql.util.SharedKafkaStreamsRuntime
    public void start(QueryId queryId) {
        this.log.info("Attempting to start query {} in runtime {}", queryId, getApplicationId());
        if (!this.collocatedQueries.containsKey(queryId) || this.collocatedQueries.get(queryId).everStarted) {
            throw new IllegalArgumentException("Cannot start because query " + queryId + " was not registered to runtime " + getApplicationId());
        }
        if (this.kafkaStreams.getTopologyByName(queryId.toString()).isPresent()) {
            throw new IllegalArgumentException("Cannot start because Streams is not done terminating an older version of query : " + queryId);
        }
        this.topolgogiesToAdd.add(this.kafkaStreams.addNamedTopology(this.collocatedQueries.get(queryId).mo276getTopology()).all());
        this.log.info("Query {} was started successfully", queryId);
    }

    @Override // io.confluent.ksql.util.SharedKafkaStreamsRuntime
    public void overrideStreamsProperties(Map<String, Object> map) {
        map.put("application.server", this.streamsProperties.get("application.server"));
        this.streamsProperties = ImmutableMap.copyOf(map);
    }

    @Override // io.confluent.ksql.util.SharedKafkaStreamsRuntime
    public void restartStreamsRuntime() {
        this.log.info("Restarting runtime {}", getApplicationId());
        Collection allTopologies = this.kafkaStreams.getAllTopologies();
        this.kafkaStreams.close();
        KafkaStreamsNamedTopologyWrapper buildNamedTopologyWrapper = this.kafkaStreamsBuilder.buildNamedTopologyWrapper(this.streamsProperties);
        this.kafkaStreams = buildNamedTopologyWrapper;
        this.kafkaStreams.setStateListener(stateListener());
        Iterator it = allTopologies.iterator();
        while (it.hasNext()) {
            BinPackedPersistentQueryMetadataImpl binPackedPersistentQueryMetadataImpl = this.collocatedQueries.get(new QueryId(((NamedTopology) it.next()).name()));
            binPackedPersistentQueryMetadataImpl.updateTopology(binPackedPersistentQueryMetadataImpl.getTopologyCopy(this));
            buildNamedTopologyWrapper.addNamedTopology(binPackedPersistentQueryMetadataImpl.mo276getTopology());
        }
        setupAndStartKafkaStreams(buildNamedTopologyWrapper);
    }
}
