package io.confluent.ksql.engine;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Ticker;
import io.confluent.ksql.query.QueryId;
import io.confluent.ksql.util.KsqlConfig;
import io.confluent.ksql.util.PersistentQueryMetadata;
import java.io.Closeable;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.Optional;
import java.util.Random;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import org.apache.kafka.streams.KafkaStreams;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/confluent/ksql/engine/QueryMonitor.class */
public class QueryMonitor implements Closeable {
    private static final Logger LOG = LoggerFactory.getLogger(QueryMonitor.class);
    private static final Random random = new Random();
    private static final Ticker CURRENT_TIME_MILLIS_TICKER = new Ticker() { // from class: io.confluent.ksql.engine.QueryMonitor.1
        public long read() {
            return System.currentTimeMillis();
        }
    };
    private static final int SHUTDOWN_TIMEOUT_MS = 5000;
    private final Ticker ticker;
    private final long retryBackoffInitialMs;
    private final long retryBackoffMaxMs;
    private final long statusRunningThresholdMs;
    private final KsqlEngine ksqlEngine;
    private final ExecutorService executor;
    private final Map<QueryId, RetryEvent> queriesRetries;
    private volatile boolean closed;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* renamed from: io.confluent.ksql.engine.QueryMonitor$2, reason: invalid class name */
    /* loaded from: input_file:io/confluent/ksql/engine/QueryMonitor$2.class */
    public static /* synthetic */ class AnonymousClass2 {
        static final /* synthetic */ int[] $SwitchMap$org$apache$kafka$streams$KafkaStreams$State = new int[KafkaStreams.State.values().length];

        static {
            try {
                $SwitchMap$org$apache$kafka$streams$KafkaStreams$State[KafkaStreams.State.ERROR.ordinal()] = 1;
            } catch (NoSuchFieldError e) {
            }
            try {
                $SwitchMap$org$apache$kafka$streams$KafkaStreams$State[KafkaStreams.State.RUNNING.ordinal()] = 2;
            } catch (NoSuchFieldError e2) {
            }
            try {
                $SwitchMap$org$apache$kafka$streams$KafkaStreams$State[KafkaStreams.State.REBALANCING.ordinal()] = 3;
            } catch (NoSuchFieldError e3) {
            }
            try {
                $SwitchMap$org$apache$kafka$streams$KafkaStreams$State[KafkaStreams.State.CREATED.ordinal()] = 4;
            } catch (NoSuchFieldError e4) {
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:io/confluent/ksql/engine/QueryMonitor$RetryEvent.class */
    public static class RetryEvent {
        private final QueryId queryId;
        private final Ticker ticker;
        private int numRetries = 0;
        private long waitingTimeMs;
        private long expiryTimeMs;
        private long retryBackoffMaxMs;
        private long baseWaitingTimeMs;

        RetryEvent(QueryId queryId, long j, long j2, Ticker ticker) {
            this.queryId = queryId;
            this.ticker = ticker;
            long read = ticker.read();
            this.baseWaitingTimeMs = j;
            this.waitingTimeMs = j;
            this.retryBackoffMaxMs = j2;
            this.expiryTimeMs = read + j;
        }

        public long nextRestartTimeMs() {
            return this.expiryTimeMs;
        }

        public int getNumRetries() {
            return this.numRetries;
        }

        public void restart(PersistentQueryMetadata persistentQueryMetadata) {
            this.numRetries++;
            QueryMonitor.LOG.info("Restarting query {} (attempt #{})", this.queryId, Integer.valueOf(this.numRetries));
            try {
                persistentQueryMetadata.restart();
            } catch (Exception e) {
                QueryMonitor.LOG.warn("Failed restarting query {}. Error = {}", this.queryId, e.getMessage());
            }
            long read = this.ticker.read();
            this.waitingTimeMs = getWaitingTimeMs();
            this.expiryTimeMs = Math.max(read, read + this.waitingTimeMs);
        }

        private long getWaitingTimeMs() {
            if (this.waitingTimeMs * 2 < this.retryBackoffMaxMs) {
                return this.waitingTimeMs * 2;
            }
            return this.retryBackoffMaxMs + QueryMonitor.random.nextInt(((int) this.baseWaitingTimeMs) / 2);
        }
    }

    /* loaded from: input_file:io/confluent/ksql/engine/QueryMonitor$Runner.class */
    private class Runner implements Runnable {
        private Runner() {
        }

        @Override // java.lang.Runnable
        public void run() {
            QueryMonitor.LOG.info("KSQL query monitor started.");
            while (!QueryMonitor.this.closed) {
                QueryMonitor.this.restartFailedQueries();
                try {
                    Thread.sleep(500L);
                } catch (InterruptedException e) {
                    QueryMonitor.LOG.info("QueryMonitor sleep thread interrupted. Terminating QueryMonitor. Error = {}", e.getMessage());
                    return;
                }
            }
        }
    }

    public QueryMonitor(KsqlConfig ksqlConfig, KsqlEngine ksqlEngine) {
        this(ksqlEngine, Executors.newSingleThreadExecutor(runnable -> {
            return new Thread(runnable, QueryMonitor.class.getName());
        }), ksqlConfig.getLong("ksql.query.retry.backoff.initial.ms").longValue(), ksqlConfig.getLong("ksql.query.retry.backoff.max.ms").longValue(), ksqlConfig.getInt("ksql.query.status.running.threshold.seconds").intValue() * 1000, CURRENT_TIME_MILLIS_TICKER);
    }

    @VisibleForTesting
    QueryMonitor(KsqlEngine ksqlEngine, ExecutorService executorService, long j, long j2, long j3, Ticker ticker) {
        this.queriesRetries = new HashMap();
        this.closed = false;
        this.retryBackoffInitialMs = j;
        this.retryBackoffMaxMs = j2;
        this.statusRunningThresholdMs = j3;
        this.ksqlEngine = ksqlEngine;
        this.executor = executorService;
        this.ticker = ticker;
    }

    public void start() {
        this.executor.execute(new Runner());
        this.executor.shutdown();
    }

    @Override // java.io.Closeable, java.lang.AutoCloseable
    public void close() {
        this.closed = true;
        try {
            this.executor.awaitTermination(5000L, TimeUnit.MILLISECONDS);
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
    }

    boolean isClosed() {
        return this.closed;
    }

    void restartFailedQueries() {
        this.ksqlEngine.getPersistentQueries().stream().filter((v0) -> {
            return v0.isError();
        }).filter(persistentQueryMetadata -> {
            return !this.queriesRetries.containsKey(persistentQueryMetadata.getQueryId());
        }).map((v0) -> {
            return v0.getQueryId();
        }).forEach(queryId -> {
            this.queriesRetries.put(queryId, newRetryEvent(queryId));
        });
        maybeRestartQueries();
    }

    private void maybeRestartQueries() {
        long read = this.ticker.read();
        Iterator<Map.Entry<QueryId, RetryEvent>> it = this.queriesRetries.entrySet().iterator();
        while (it.hasNext()) {
            Map.Entry<QueryId, RetryEvent> next = it.next();
            QueryId key = next.getKey();
            RetryEvent value = next.getValue();
            Optional<PersistentQueryMetadata> persistentQuery = this.ksqlEngine.getPersistentQuery(key);
            if (persistentQuery.isPresent()) {
                PersistentQueryMetadata persistentQueryMetadata = persistentQuery.get();
                switch (AnonymousClass2.$SwitchMap$org$apache$kafka$streams$KafkaStreams$State[persistentQueryMetadata.getState().ordinal()]) {
                    case 1:
                        if (read < value.nextRestartTimeMs()) {
                            break;
                        } else {
                            value.restart(persistentQueryMetadata);
                            break;
                        }
                    case 2:
                    case 3:
                        if (persistentQueryMetadata.uptime() < this.statusRunningThresholdMs) {
                            break;
                        } else {
                            LOG.info("Query {} has been running for more than {} seconds. Marking query as healthy.", key, Long.valueOf(this.statusRunningThresholdMs * 1000));
                            persistentQueryMetadata.clearErrors();
                            it.remove();
                            break;
                        }
                    case 4:
                        break;
                    default:
                        LOG.debug("Query {} is in status {}. Removing from query retry monitor.", key, persistentQueryMetadata.getState());
                        it.remove();
                        break;
                }
            } else {
                LOG.debug("Query {} was manually terminated. Removing from query retry monitor.", key);
                it.remove();
            }
        }
    }

    private RetryEvent newRetryEvent(QueryId queryId) {
        return new RetryEvent(queryId, this.retryBackoffInitialMs, this.retryBackoffMaxMs, this.ticker);
    }
}
