package io.confluent.ksql.util;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Throwables;
import com.google.common.base.Ticker;
import com.google.common.collect.EvictingQueue;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import io.confluent.ksql.logging.processing.ProcessingLoggerFactory;
import io.confluent.ksql.logging.query.QueryLogger;
import io.confluent.ksql.name.SourceName;
import io.confluent.ksql.query.KafkaStreamsBuilder;
import io.confluent.ksql.query.QueryError;
import io.confluent.ksql.query.QueryErrorClassifier;
import io.confluent.ksql.query.QueryId;
import io.confluent.ksql.rest.entity.StreamsTaskMetadata;
import io.confluent.ksql.schema.ksql.LogicalSchema;
import io.confluent.ksql.util.KsqlConstants;
import io.confluent.ksql.util.QueryMetadata;
import java.time.Duration;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.LagInfo;
import org.apache.kafka.streams.StreamsMetadata;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.errors.StreamsException;
import org.apache.kafka.streams.errors.StreamsUncaughtExceptionHandler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/confluent/ksql/util/QueryMetadataImpl.class */
public class QueryMetadataImpl implements QueryMetadata {
    private final AtomicBoolean isPaused;
    private final String statementString;
    private final String executionPlan;
    private final String queryApplicationId;
    private final Topology topology;
    private final KafkaStreamsBuilder kafkaStreamsBuilder;
    private final ImmutableMap<String, Object> streamsProperties;
    private final ImmutableMap<String, Object> overriddenProperties;
    private final ImmutableSet<SourceName> sourceNames;
    private final LogicalSchema logicalSchema;
    private final Duration closeTimeout;
    private final QueryId queryId;
    private final QueryErrorClassifier errorClassifier;
    private final TimeBoundedQueue queryErrors;
    private final RetryEvent retryEvent;
    private final QueryMetadata.Listener listener;
    private final ProcessingLoggerFactory loggerFactory;
    private volatile boolean everStarted;
    private volatile KafkaStreams kafkaStreams;
    private boolean initialized;
    private boolean corruptionCommandTopic;
    private static final Logger LOG = LoggerFactory.getLogger(QueryMetadataImpl.class);
    private static final Ticker CURRENT_TIME_MILLIS_TICKER = new Ticker() { // from class: io.confluent.ksql.util.QueryMetadataImpl.1
        public long read() {
            return System.currentTimeMillis();
        }
    };

    /* loaded from: input_file:io/confluent/ksql/util/QueryMetadataImpl$RetryEvent.class */
    public static class RetryEvent implements QueryMetadata.RetryEvent {
        private final Ticker ticker;
        private final QueryId queryId;
        private Map<String, Integer> numRetries = new ConcurrentHashMap();
        private long waitingTimeMs;
        private long expiryTimeMs;
        private long retryBackoffMaxMs;

        RetryEvent(QueryId queryId, long j, long j2, Ticker ticker) {
            this.ticker = ticker;
            this.queryId = queryId;
            long read = ticker.read();
            this.waitingTimeMs = j;
            this.retryBackoffMaxMs = j2;
            this.expiryTimeMs = read + j;
        }

        @Override // io.confluent.ksql.util.QueryMetadata.RetryEvent
        public long nextRestartTimeMs() {
            return this.expiryTimeMs;
        }

        @Override // io.confluent.ksql.util.QueryMetadata.RetryEvent
        public int getNumRetries(String str) {
            return this.numRetries.getOrDefault(str, 0).intValue();
        }

        @Override // io.confluent.ksql.util.QueryMetadata.RetryEvent
        public void backOff(String str) {
            long read = this.ticker.read();
            this.waitingTimeMs = getWaitingTimeMs();
            try {
                Thread.sleep(this.waitingTimeMs);
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
            QueryMetadataImpl.LOG.info("Restarting query {} thread {} (attempt #{})", new Object[]{this.queryId, str, Integer.valueOf(this.numRetries.merge(str, 1, (v0, v1) -> {
                return Integer.sum(v0, v1);
            }).intValue())});
            this.expiryTimeMs = Math.max(read, read + this.waitingTimeMs);
        }

        private long getWaitingTimeMs() {
            return this.waitingTimeMs * 2 < this.retryBackoffMaxMs ? this.waitingTimeMs * 2 : this.retryBackoffMaxMs;
        }
    }

    /* loaded from: input_file:io/confluent/ksql/util/QueryMetadataImpl$TimeBoundedQueue.class */
    public static class TimeBoundedQueue {
        private final Duration duration;
        private final Queue<QueryError> queue;

        /* JADX INFO: Access modifiers changed from: package-private */
        public TimeBoundedQueue(Duration duration, int i) {
            this.queue = new ConcurrentLinkedQueue(EvictingQueue.create(i));
            this.duration = duration;
        }

        public void add(QueryError queryError) {
            this.queue.add(queryError);
            evict();
        }

        public List<QueryError> toImmutableList() {
            evict();
            return ImmutableList.copyOf(this.queue);
        }

        private void evict() {
            while (this.queue.peek() != null && this.queue.peek().getTimestamp() <= System.currentTimeMillis() - this.duration.toMillis()) {
                this.queue.poll();
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    @VisibleForTesting
    public QueryMetadataImpl(String str, LogicalSchema logicalSchema, Set<SourceName> set, String str2, String str3, Topology topology, KafkaStreamsBuilder kafkaStreamsBuilder, Map<String, Object> map, Map<String, Object> map2, long j, QueryId queryId, QueryErrorClassifier queryErrorClassifier, int i, long j2, long j3, QueryMetadata.Listener listener, ProcessingLoggerFactory processingLoggerFactory) {
        this.isPaused = new AtomicBoolean(false);
        this.everStarted = false;
        this.initialized = false;
        this.corruptionCommandTopic = false;
        this.statementString = (String) Objects.requireNonNull(str, "statementString");
        this.executionPlan = (String) Objects.requireNonNull(str2, "executionPlan");
        this.queryApplicationId = (String) Objects.requireNonNull(str3, "queryApplicationId");
        this.topology = (Topology) Objects.requireNonNull(topology, "kafkaTopicClient");
        this.kafkaStreamsBuilder = (KafkaStreamsBuilder) Objects.requireNonNull(kafkaStreamsBuilder, "kafkaStreamsBuilder");
        this.streamsProperties = ImmutableMap.copyOf((Map) Objects.requireNonNull(map, "streamsPropeties"));
        this.overriddenProperties = ImmutableMap.copyOf((Map) Objects.requireNonNull(map2, "overriddenProperties"));
        this.listener = (QueryMetadata.Listener) Objects.requireNonNull(listener, "listener");
        this.sourceNames = ImmutableSet.copyOf((Collection) Objects.requireNonNull(set, "sourceNames"));
        this.logicalSchema = (LogicalSchema) Objects.requireNonNull(logicalSchema, "logicalSchema");
        this.closeTimeout = Duration.ofMillis(j);
        this.queryId = (QueryId) Objects.requireNonNull(queryId, "queryId");
        this.errorClassifier = (QueryErrorClassifier) Objects.requireNonNull(queryErrorClassifier, "errorClassifier");
        this.queryErrors = new TimeBoundedQueue(Duration.ofHours(1L), i);
        this.retryEvent = new RetryEvent(queryId, j2, j3, CURRENT_TIME_MILLIS_TICKER);
        this.loggerFactory = (ProcessingLoggerFactory) Objects.requireNonNull(processingLoggerFactory, "loggerFactory");
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public QueryMetadataImpl(QueryMetadataImpl queryMetadataImpl, QueryMetadata.Listener listener) {
        this.isPaused = new AtomicBoolean(false);
        this.everStarted = false;
        this.initialized = false;
        this.corruptionCommandTopic = false;
        this.statementString = queryMetadataImpl.getStatementString();
        this.kafkaStreams = queryMetadataImpl.getKafkaStreams();
        this.executionPlan = queryMetadataImpl.getExecutionPlan();
        this.queryApplicationId = queryMetadataImpl.getQueryApplicationId();
        this.topology = queryMetadataImpl.mo275getTopology();
        this.kafkaStreamsBuilder = queryMetadataImpl.kafkaStreamsBuilder;
        this.streamsProperties = queryMetadataImpl.mo284getStreamsProperties();
        this.overriddenProperties = queryMetadataImpl.mo285getOverriddenProperties();
        this.sourceNames = queryMetadataImpl.mo283getSourceNames();
        this.logicalSchema = queryMetadataImpl.getLogicalSchema();
        this.closeTimeout = queryMetadataImpl.closeTimeout;
        this.queryId = queryMetadataImpl.getQueryId();
        this.errorClassifier = queryMetadataImpl.errorClassifier;
        this.everStarted = queryMetadataImpl.everStarted;
        this.queryErrors = new TimeBoundedQueue(Duration.ZERO, 0);
        this.retryEvent = new RetryEvent(queryMetadataImpl.getQueryId(), 0L, 0L, new Ticker() { // from class: io.confluent.ksql.util.QueryMetadataImpl.2
            public long read() {
                return 0L;
            }
        });
        this.listener = (QueryMetadata.Listener) Objects.requireNonNull(listener, "stopListeners");
        this.loggerFactory = queryMetadataImpl.loggerFactory;
    }

    @Override // io.confluent.ksql.util.QueryMetadata
    public void initialize() {
        resetKafkaStreams(this.kafkaStreamsBuilder.build(this.topology, this.streamsProperties));
        this.initialized = true;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse uncaughtHandler(Throwable th) {
        QueryError.Type type = QueryError.Type.UNKNOWN;
        try {
            try {
                QueryLogger.error(String.format("Uncaught exception in query %s", th), this.statementString);
                type = this.errorClassifier.classify(th);
                QueryError queryError = new QueryError(System.currentTimeMillis(), Throwables.getStackTraceAsString(th), type);
                this.listener.onError(this, queryError);
                this.queryErrors.add(queryError);
                LOG.error("Unhandled exception caught in streams thread {}. ({})", new Object[]{Thread.currentThread().getName(), type, th});
            } catch (Exception e) {
                LOG.error("Error classifying unhandled exception", e);
                QueryError queryError2 = new QueryError(System.currentTimeMillis(), Throwables.getStackTraceAsString(th), type);
                this.listener.onError(this, queryError2);
                this.queryErrors.add(queryError2);
                LOG.error("Unhandled exception caught in streams thread {}. ({})", new Object[]{Thread.currentThread().getName(), type, th});
            }
            this.retryEvent.backOff(Thread.currentThread().getName());
            return StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse.REPLACE_THREAD;
        } catch (Throwable th2) {
            QueryError queryError3 = new QueryError(System.currentTimeMillis(), Throwables.getStackTraceAsString(th), type);
            this.listener.onError(this, queryError3);
            this.queryErrors.add(queryError3);
            LOG.error("Unhandled exception caught in streams thread {}. ({})", new Object[]{Thread.currentThread().getName(), type, th});
            throw th2;
        }
    }

    @Override // io.confluent.ksql.util.QueryMetadata
    public Set<StreamsTaskMetadata> getTaskMetadata() {
        return (Set) this.kafkaStreams.metadataForLocalThreads().stream().flatMap(threadMetadata -> {
            return threadMetadata.activeTasks().stream();
        }).map(StreamsTaskMetadata::fromStreamsTaskMetadata).collect(Collectors.toSet());
    }

    @Override // io.confluent.ksql.util.QueryMetadata
    @SuppressFBWarnings(value = {"EI_EXPOSE_REP"}, justification = "overriddenProperties is ImmutableMap")
    /* renamed from: getOverriddenProperties, reason: merged with bridge method [inline-methods] */
    public ImmutableMap<String, Object> mo285getOverriddenProperties() {
        return this.overriddenProperties;
    }

    @Override // io.confluent.ksql.util.QueryMetadata
    public String getStatementString() {
        return this.statementString;
    }

    @Override // io.confluent.ksql.util.QueryMetadata
    public void setUncaughtExceptionHandler(StreamsUncaughtExceptionHandler streamsUncaughtExceptionHandler) {
        this.kafkaStreams.setUncaughtExceptionHandler(streamsUncaughtExceptionHandler);
    }

    @Override // io.confluent.ksql.util.QueryMetadata
    public KafkaStreams.State getState() {
        return this.corruptionCommandTopic ? KafkaStreams.State.ERROR : this.kafkaStreams.state();
    }

    @Override // io.confluent.ksql.util.QueryMetadata
    public String getExecutionPlan() {
        return this.executionPlan;
    }

    @Override // io.confluent.ksql.util.QueryMetadata
    public String getQueryApplicationId() {
        return this.queryApplicationId;
    }

    @Override // io.confluent.ksql.util.QueryMetadata
    /* renamed from: getTopology */
    public Topology mo275getTopology() {
        return this.topology;
    }

    @Override // io.confluent.ksql.util.QueryMetadata
    public Map<String, Map<Integer, LagInfo>> getAllLocalStorePartitionLags() {
        try {
            return this.kafkaStreams.allLocalStorePartitionLags();
        } catch (IllegalStateException | StreamsException e) {
            LOG.error(e.getMessage());
            return ImmutableMap.of();
        }
    }

    @Override // io.confluent.ksql.util.QueryMetadata
    public Collection<StreamsMetadata> getAllStreamsHostMetadata() {
        try {
            return ImmutableList.copyOf(this.kafkaStreams.metadataForAllStreamsClients());
        } catch (IllegalStateException e) {
            LOG.error(e.getMessage());
            return ImmutableList.of();
        }
    }

    @Override // io.confluent.ksql.util.QueryMetadata
    @SuppressFBWarnings(value = {"EI_EXPOSE_REP"}, justification = "streamsProperties is ImmutableMap")
    /* renamed from: getStreamsProperties, reason: merged with bridge method [inline-methods] */
    public ImmutableMap<String, Object> mo284getStreamsProperties() {
        return this.streamsProperties;
    }

    @Override // io.confluent.ksql.util.QueryMetadata
    public LogicalSchema getLogicalSchema() {
        return this.logicalSchema;
    }

    @Override // io.confluent.ksql.util.QueryMetadata
    @SuppressFBWarnings(value = {"EI_EXPOSE_REP"}, justification = "sourceNames is ImmutableSet")
    /* renamed from: getSourceNames, reason: merged with bridge method [inline-methods] */
    public ImmutableSet<SourceName> mo283getSourceNames() {
        return this.sourceNames;
    }

    @Override // io.confluent.ksql.util.QueryMetadata
    public boolean hasEverBeenStarted() {
        return this.everStarted;
    }

    @Override // io.confluent.ksql.util.QueryMetadata
    public QueryId getQueryId() {
        return this.queryId;
    }

    @Override // io.confluent.ksql.util.QueryMetadata
    public KsqlConstants.KsqlQueryType getQueryType() {
        return KsqlConstants.KsqlQueryType.PERSISTENT;
    }

    @Override // io.confluent.ksql.util.QueryMetadata
    public String getTopologyDescription() {
        return this.topology.describe().toString();
    }

    @Override // io.confluent.ksql.util.QueryMetadata
    public List<QueryError> getQueryErrors() {
        return this.queryErrors.toImmutableList();
    }

    @Override // io.confluent.ksql.util.QueryMetadata
    public void setCorruptionQueryError() {
        QueryError queryError = new QueryError(System.currentTimeMillis(), "Query not started due to corruption in the command topic.", QueryError.Type.USER);
        this.listener.onError(this, queryError);
        this.queryErrors.add(queryError);
        this.corruptionCommandTopic = true;
    }

    @Override // io.confluent.ksql.util.QueryMetadata
    @SuppressFBWarnings({"EI_EXPOSE_REP"})
    public KafkaStreams getKafkaStreams() {
        return this.kafkaStreams;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public QueryMetadata.Listener getListener() {
        return this.listener;
    }

    private void resetKafkaStreams(KafkaStreams kafkaStreams) {
        this.kafkaStreams = kafkaStreams;
        setUncaughtExceptionHandler(this::uncaughtHandler);
        kafkaStreams.setStateListener((state, state2) -> {
            this.listener.onStateChange(this, state, state2);
        });
    }

    protected boolean closeKafkaStreams() {
        if (!this.initialized) {
            return true;
        }
        this.kafkaStreams.close(this.closeTimeout);
        if (getState().equals(KafkaStreams.State.NOT_RUNNING)) {
            return true;
        }
        LOG.warn("query has not terminated even after close. This may happen when streams threads are hung. State: " + getState());
        return false;
    }

    @Override // io.confluent.ksql.util.QueryMetadata
    public void close() {
        this.loggerFactory.getLoggersWithPrefix(this.queryId.toString()).forEach((v0) -> {
            v0.close();
        });
        doClose(true);
        this.listener.onClose(this);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void doClose(boolean z) {
        boolean closeKafkaStreams = closeKafkaStreams();
        if (z && closeKafkaStreams) {
            this.kafkaStreams.cleanUp();
        } else {
            if (closeKafkaStreams) {
                return;
            }
            LOG.warn("Query has not successfully closed, skipping cleanup");
        }
    }

    public boolean isInitialized() {
        return this.initialized;
    }

    @Override // io.confluent.ksql.util.QueryMetadata
    public void start() {
        if (!this.initialized) {
            throw new KsqlException(String.format("Failed to initialize query %s before starting it", this.queryApplicationId));
        }
        LOG.info("Starting query with application id: {}", this.queryApplicationId);
        this.everStarted = true;
        this.listener.onStateChange(this, this.kafkaStreams.state(), this.kafkaStreams.state());
        this.kafkaStreams.start();
    }

    @Override // io.confluent.ksql.util.QueryMetadata
    public KsqlConstants.KsqlQueryStatus getQueryStatus() {
        return this.isPaused.get() ? KsqlConstants.KsqlQueryStatus.PAUSED : KsqlConstants.fromStreamsState(getState());
    }

    @Override // io.confluent.ksql.util.QueryMetadata
    public void pause() {
        this.kafkaStreams.pause();
        this.isPaused.set(true);
        this.listener.onPause(this);
    }

    @Override // io.confluent.ksql.util.QueryMetadata
    public void resume() {
        this.kafkaStreams.resume();
        this.isPaused.set(false);
        this.listener.onResume(this);
    }
}
