package io.confluent.ksql.rest.server.computation;

import com.google.common.annotations.VisibleForTesting;
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import io.confluent.ksql.rest.Errors;
import io.confluent.ksql.rest.server.resources.IncompatibleKsqlCommandVersionException;
import io.confluent.ksql.rest.server.state.ServerState;
import io.confluent.ksql.rest.util.ClusterTerminator;
import io.confluent.ksql.rest.util.PersistentQueryCleanupImpl;
import io.confluent.ksql.rest.util.TerminateCluster;
import io.confluent.ksql.services.KafkaTopicClient;
import io.confluent.ksql.util.Pair;
import io.confluent.ksql.util.PersistentQueryMetadata;
import io.confluent.ksql.util.RetryUtil;
import java.io.Closeable;
import java.time.Clock;
import java.time.Duration;
import java.time.Instant;
import java.time.temporal.Temporal;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Supplier;
import org.apache.kafka.clients.consumer.OffsetOutOfRangeException;
import org.apache.kafka.common.errors.SerializationException;
import org.apache.kafka.common.errors.WakeupException;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.serialization.Deserializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/confluent/ksql/rest/server/computation/CommandRunner.class */
public class CommandRunner implements Closeable {
    private static final int STATEMENT_RETRY_MS = 100;
    private static final int MAX_STATEMENT_RETRY_MS = 5000;
    private static final int SHUTDOWN_TIMEOUT_MS = 15000;
    private final InteractiveStatementExecutor statementExecutor;
    private final CommandQueue commandStore;
    private final ExecutorService executor;
    private final Function<List<QueuedCommand>, List<QueuedCommand>> compactor;
    private volatile boolean closed;
    private final int maxRetries;
    private final ClusterTerminator clusterTerminator;
    private final ServerState serverState;
    private final CommandRunnerMetrics commandRunnerMetric;
    private final AtomicReference<Pair<QueuedCommand, Instant>> currentCommandRef;
    private final AtomicReference<Instant> lastPollTime;
    private final Duration commandRunnerHealthTimeout;
    private final Clock clock;
    private final Deserializer<Command> commandDeserializer;
    private final Consumer<QueuedCommand> incompatibleCommandChecker;
    private final Errors errorHandler;
    private boolean incompatibleCommandDetected;
    private final Supplier<Boolean> commandTopicExists;
    private boolean commandTopicDeleted;
    private Status state;
    private static final Logger LOG = LoggerFactory.getLogger(CommandRunner.class);
    private static final Duration NEW_CMDS_TIMEOUT = Duration.ofMillis(5000);

    /* loaded from: input_file:io/confluent/ksql/rest/server/computation/CommandRunner$CommandRunnerDegradedReason.class */
    public enum CommandRunnerDegradedReason {
        NONE(errors -> {
            return "";
        }),
        CORRUPTED((v0) -> {
            return v0.commandRunnerDegradedCorruptedErrorMessage();
        }),
        INCOMPATIBLE_COMMAND((v0) -> {
            return v0.commandRunnerDegradedIncompatibleCommandsErrorMessage();
        });

        private final Function<Errors, String> msgFactory;

        public String getMsg(Errors errors) {
            return this.msgFactory.apply(errors);
        }

        CommandRunnerDegradedReason(Function function) {
            this.msgFactory = function;
        }
    }

    /* loaded from: input_file:io/confluent/ksql/rest/server/computation/CommandRunner$CommandRunnerStatus.class */
    public enum CommandRunnerStatus {
        RUNNING,
        ERROR,
        DEGRADED
    }

    /* loaded from: input_file:io/confluent/ksql/rest/server/computation/CommandRunner$Runner.class */
    private class Runner implements Runnable {
        private Runner() {
        }

        @Override // java.lang.Runnable
        public void run() {
            while (!CommandRunner.this.closed) {
                try {
                    try {
                        if (CommandRunner.this.incompatibleCommandDetected) {
                            CommandRunner.LOG.warn("CommandRunner entering degraded state due to encountering an incompatible command");
                            CommandRunner.this.state = new Status(CommandRunnerStatus.DEGRADED, CommandRunnerDegradedReason.INCOMPATIBLE_COMMAND);
                            CommandRunner.this.closeEarly();
                        } else if (CommandRunner.this.commandStore.corruptionDetected()) {
                            CommandRunner.LOG.warn("CommandRunner entering degraded state due to encountering corruption between topic and backup");
                            CommandRunner.this.state = new Status(CommandRunnerStatus.DEGRADED, CommandRunnerDegradedReason.CORRUPTED);
                            CommandRunner.this.closeEarly();
                        } else if (CommandRunner.this.commandTopicDeleted) {
                            CommandRunner.LOG.warn("CommandRunner entering degraded state due to command topic deletion.");
                            CommandRunner.this.state = new Status(CommandRunnerStatus.DEGRADED, CommandRunnerDegradedReason.CORRUPTED);
                            CommandRunner.this.closeEarly();
                        } else {
                            CommandRunner.LOG.trace("Polling for new writes to command topic");
                            CommandRunner.this.fetchAndRunCommands();
                        }
                    } catch (WakeupException e) {
                        if (!CommandRunner.this.closed) {
                            throw e;
                        }
                        CommandRunner.LOG.info("Closing command store");
                        CommandRunner.this.commandStore.close();
                        return;
                    } catch (OffsetOutOfRangeException e2) {
                        CommandRunner.LOG.warn("The command topic offset was reset. CommandRunner thread exiting.");
                        CommandRunner.this.state = new Status(CommandRunnerStatus.DEGRADED, CommandRunnerDegradedReason.CORRUPTED);
                        CommandRunner.this.closeEarly();
                        CommandRunner.LOG.info("Closing command store");
                        CommandRunner.this.commandStore.close();
                        return;
                    }
                } catch (Throwable th) {
                    CommandRunner.LOG.info("Closing command store");
                    CommandRunner.this.commandStore.close();
                    throw th;
                }
            }
            CommandRunner.LOG.info("Closing command store");
            CommandRunner.this.commandStore.close();
        }
    }

    /* loaded from: input_file:io/confluent/ksql/rest/server/computation/CommandRunner$Status.class */
    public static class Status {
        private final CommandRunnerStatus status;
        private final CommandRunnerDegradedReason degradedReason;

        public Status(CommandRunnerStatus commandRunnerStatus, CommandRunnerDegradedReason commandRunnerDegradedReason) {
            this.status = commandRunnerStatus;
            this.degradedReason = commandRunnerDegradedReason;
        }

        public CommandRunnerStatus getStatus() {
            return this.status;
        }

        public CommandRunnerDegradedReason getDegradedReason() {
            return this.degradedReason;
        }
    }

    public CommandRunner(InteractiveStatementExecutor interactiveStatementExecutor, CommandQueue commandQueue, int i, ClusterTerminator clusterTerminator, ServerState serverState, String str, Duration duration, String str2, Deserializer<Command> deserializer, Errors errors, KafkaTopicClient kafkaTopicClient, String str3, Metrics metrics) {
        this(interactiveStatementExecutor, commandQueue, i, clusterTerminator, Executors.newSingleThreadExecutor(runnable -> {
            return new Thread(runnable, "CommandRunner");
        }), serverState, str, duration, str2, Clock.systemUTC(), RestoreCommandsCompactor::compact, queuedCommand -> {
            queuedCommand.getAndDeserializeCommandId();
            queuedCommand.getAndDeserializeCommand(deserializer);
        }, deserializer, errors, () -> {
            return Boolean.valueOf(kafkaTopicClient.isTopicExists(str3));
        }, metrics);
    }

    @VisibleForTesting
    CommandRunner(InteractiveStatementExecutor interactiveStatementExecutor, CommandQueue commandQueue, int i, ClusterTerminator clusterTerminator, ExecutorService executorService, ServerState serverState, String str, Duration duration, String str2, Clock clock, Function<List<QueuedCommand>, List<QueuedCommand>> function, Consumer<QueuedCommand> consumer, Deserializer<Command> deserializer, Errors errors, Supplier<Boolean> supplier, Metrics metrics) {
        this.closed = false;
        this.state = new Status(CommandRunnerStatus.RUNNING, CommandRunnerDegradedReason.NONE);
        this.statementExecutor = (InteractiveStatementExecutor) Objects.requireNonNull(interactiveStatementExecutor, "statementExecutor");
        this.commandStore = (CommandQueue) Objects.requireNonNull(commandQueue, "commandStore");
        this.maxRetries = i;
        this.clusterTerminator = (ClusterTerminator) Objects.requireNonNull(clusterTerminator, "clusterTerminator");
        this.executor = (ExecutorService) Objects.requireNonNull(executorService, "executor");
        this.serverState = (ServerState) Objects.requireNonNull(serverState, "serverState");
        this.commandRunnerHealthTimeout = (Duration) Objects.requireNonNull(duration, "commandRunnerHealthTimeout");
        this.currentCommandRef = new AtomicReference<>(null);
        this.lastPollTime = new AtomicReference<>(null);
        this.commandRunnerMetric = new CommandRunnerMetrics(str, this, str2, metrics);
        this.clock = (Clock) Objects.requireNonNull(clock, "clock");
        this.compactor = (Function) Objects.requireNonNull(function, "compactor");
        this.incompatibleCommandChecker = (Consumer) Objects.requireNonNull(consumer, "incompatibleCommandChecker");
        this.commandDeserializer = (Deserializer) Objects.requireNonNull(deserializer, "commandDeserializer");
        this.errorHandler = (Errors) Objects.requireNonNull(errors, "errorHandler");
        this.commandTopicExists = (Supplier) Objects.requireNonNull(supplier, "commandTopicExists");
        this.incompatibleCommandDetected = false;
        this.commandTopicDeleted = false;
    }

    public void start() {
        this.executor.execute(new Runner());
        this.executor.shutdown();
    }

    @Override // java.io.Closeable, java.lang.AutoCloseable
    public void close() {
        if (!this.closed) {
            closeEarly();
        }
        this.commandRunnerMetric.close();
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void closeEarly() {
        try {
            this.closed = true;
            this.commandStore.wakeup();
            this.executor.awaitTermination(15000L, TimeUnit.MILLISECONDS);
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
    }

    public void processPriorCommands(PersistentQueryCleanupImpl persistentQueryCleanupImpl) {
        try {
            List<QueuedCommand> checkForIncompatibleCommands = checkForIncompatibleCommands(this.commandStore.getRestoreCommands());
            LOG.info("Restoring previous state from {} commands.", Integer.valueOf(checkForIncompatibleCommands.size()));
            Optional<QueuedCommand> findTerminateCommand = findTerminateCommand(checkForIncompatibleCommands, this.commandDeserializer);
            if (findTerminateCommand.isPresent()) {
                LOG.info("Cluster previously terminated: terminating.");
                terminateCluster(findTerminateCommand.get().getAndDeserializeCommand(this.commandDeserializer));
                return;
            }
            this.compactor.apply(checkForIncompatibleCommands).forEach(queuedCommand -> {
                this.currentCommandRef.set(new Pair<>(queuedCommand, this.clock.instant()));
                RetryUtil.retryWithBackoff(this.maxRetries, 100, MAX_STATEMENT_RETRY_MS, () -> {
                    this.statementExecutor.handleRestore(queuedCommand);
                }, new Class[]{WakeupException.class});
                this.currentCommandRef.set(null);
            });
            List<PersistentQueryMetadata> persistentQueries = this.statementExecutor.getKsqlEngine().getPersistentQueries();
            if (this.commandStore.corruptionDetected()) {
                LOG.info("Corruption detected, queries will not be started.");
                persistentQueries.forEach((v0) -> {
                    v0.setCorruptionQueryError();
                });
            } else {
                LOG.info("Restarting {} queries.", Integer.valueOf(persistentQueries.size()));
                persistentQueries.forEach((v0) -> {
                    v0.start();
                });
                persistentQueryCleanupImpl.cleanupLeakedQueries(persistentQueries);
            }
            LOG.info("Restore complete");
        } catch (Exception e) {
            LOG.error("Error during restore", e);
            throw e;
        }
    }

    void fetchAndRunCommands() {
        this.lastPollTime.set(this.clock.instant());
        List<QueuedCommand> newCommands = this.commandStore.getNewCommands(NEW_CMDS_TIMEOUT);
        if (newCommands.isEmpty()) {
            if (this.commandTopicExists.get().booleanValue()) {
                return;
            }
            this.commandTopicDeleted = true;
            return;
        }
        List<QueuedCommand> checkForIncompatibleCommands = checkForIncompatibleCommands(newCommands);
        Optional<QueuedCommand> findTerminateCommand = findTerminateCommand(checkForIncompatibleCommands, this.commandDeserializer);
        if (findTerminateCommand.isPresent()) {
            terminateCluster(findTerminateCommand.get().getAndDeserializeCommand(this.commandDeserializer));
            return;
        }
        LOG.debug("Found {} new writes to command topic", Integer.valueOf(checkForIncompatibleCommands.size()));
        for (QueuedCommand queuedCommand : checkForIncompatibleCommands) {
            if (this.closed) {
                return;
            } else {
                executeStatement(queuedCommand);
            }
        }
    }

    private void executeStatement(QueuedCommand queuedCommand) {
        String commandId = queuedCommand.getAndDeserializeCommandId().toString();
        LOG.info("Executing statement: " + commandId);
        Runnable runnable = () -> {
            if (this.closed) {
                LOG.info("Execution aborted as system is closing down");
            } else {
                this.statementExecutor.handleStatement(queuedCommand);
                LOG.info("Executed statement: " + commandId);
            }
        };
        this.currentCommandRef.set(new Pair<>(queuedCommand, this.clock.instant()));
        RetryUtil.retryWithBackoff(this.maxRetries, 100, MAX_STATEMENT_RETRY_MS, runnable, new Class[]{WakeupException.class});
        this.currentCommandRef.set(null);
    }

    private static Optional<QueuedCommand> findTerminateCommand(List<QueuedCommand> list, Deserializer<Command> deserializer) {
        return list.stream().filter(queuedCommand -> {
            return queuedCommand.getAndDeserializeCommand(deserializer).getStatement().equalsIgnoreCase(TerminateCluster.TERMINATE_CLUSTER_STATEMENT_TEXT);
        }).findFirst();
    }

    private void terminateCluster(Command command) {
        this.serverState.setTerminating();
        LOG.info("Terminating the KSQL server.");
        close();
        this.clusterTerminator.terminateCluster((List) command.getOverwriteProperties().getOrDefault("deleteTopicList", Collections.emptyList()));
        this.serverState.setTerminated();
        LOG.info("The KSQL server was terminated.");
        closeEarly();
        LOG.debug("The KSQL command runner was closed.");
    }

    private List<QueuedCommand> checkForIncompatibleCommands(List<QueuedCommand> list) {
        ArrayList arrayList = new ArrayList();
        try {
            for (QueuedCommand queuedCommand : list) {
                this.incompatibleCommandChecker.accept(queuedCommand);
                arrayList.add(queuedCommand);
            }
        } catch (SerializationException | IncompatibleKsqlCommandVersionException e) {
            LOG.info("Incompatible command record detected when processing command topic", e);
            this.incompatibleCommandDetected = true;
        }
        return arrayList;
    }

    @SuppressFBWarnings(value = {"EI_EXPOSE_REP"}, justification = "should be mutable")
    public CommandQueue getCommandQueue() {
        return this.commandStore;
    }

    public CommandRunnerStatus checkCommandRunnerStatus() {
        if (this.state.getStatus() == CommandRunnerStatus.DEGRADED) {
            return CommandRunnerStatus.DEGRADED;
        }
        Pair<QueuedCommand, Instant> pair = this.currentCommandRef.get();
        if (pair == null) {
            this.state = (this.lastPollTime.get() == null || Duration.between(this.lastPollTime.get(), this.clock.instant()).toMillis() < NEW_CMDS_TIMEOUT.toMillis() * 3) ? new Status(CommandRunnerStatus.RUNNING, CommandRunnerDegradedReason.NONE) : new Status(CommandRunnerStatus.ERROR, CommandRunnerDegradedReason.NONE);
        } else {
            this.state = Duration.between((Temporal) pair.right, this.clock.instant()).toMillis() < this.commandRunnerHealthTimeout.toMillis() ? new Status(CommandRunnerStatus.RUNNING, CommandRunnerDegradedReason.NONE) : new Status(CommandRunnerStatus.ERROR, CommandRunnerDegradedReason.NONE);
        }
        return this.state.getStatus();
    }

    public ServerState.State checkServerState() {
        return this.serverState.getState();
    }

    public CommandRunnerDegradedReason getCommandRunnerDegradedReason() {
        return this.state.getDegradedReason();
    }

    public String getCommandRunnerDegradedWarning() {
        return getCommandRunnerDegradedReason().getMsg(this.errorHandler);
    }
}
