package io.confluent.ksql.rest.server.computation;

import com.google.common.annotations.VisibleForTesting;
import io.confluent.ksql.KsqlExecutionContext;
import io.confluent.ksql.engine.KsqlEngine;
import io.confluent.ksql.engine.KsqlPlan;
import io.confluent.ksql.exception.ExceptionUtil;
import io.confluent.ksql.parser.KsqlParser;
import io.confluent.ksql.parser.tree.CreateAsSelect;
import io.confluent.ksql.parser.tree.CreateTableAsSelect;
import io.confluent.ksql.parser.tree.ExecutableDdlStatement;
import io.confluent.ksql.parser.tree.InsertInto;
import io.confluent.ksql.parser.tree.TerminateQuery;
import io.confluent.ksql.planner.plan.ConfiguredKsqlPlan;
import io.confluent.ksql.query.QueryId;
import io.confluent.ksql.query.id.SpecificQueryIdGenerator;
import io.confluent.ksql.rest.entity.CommandId;
import io.confluent.ksql.rest.entity.CommandStatus;
import io.confluent.ksql.rest.server.StatementParser;
import io.confluent.ksql.rest.server.resources.KsqlConfigurable;
import io.confluent.ksql.services.ServiceContext;
import io.confluent.ksql.statement.ConfiguredStatement;
import io.confluent.ksql.util.KsqlConfig;
import io.confluent.ksql.util.KsqlException;
import io.confluent.ksql.util.PersistentQueryMetadata;
import io.confluent.ksql.util.QueryMetadata;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/confluent/ksql/rest/server/computation/InteractiveStatementExecutor.class */
public class InteractiveStatementExecutor implements KsqlConfigurable {
    private static final Logger log = LoggerFactory.getLogger(InteractiveStatementExecutor.class);
    private final ServiceContext serviceContext;
    private final KsqlEngine ksqlEngine;
    private final StatementParser statementParser;
    private final SpecificQueryIdGenerator queryIdGenerator;
    private final Map<CommandId, CommandStatus> statusStore;
    private KsqlConfig ksqlConfig;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:io/confluent/ksql/rest/server/computation/InteractiveStatementExecutor$Mode.class */
    public enum Mode {
        RESTORE,
        EXECUTE
    }

    public InteractiveStatementExecutor(ServiceContext serviceContext, KsqlEngine ksqlEngine, SpecificQueryIdGenerator specificQueryIdGenerator) {
        this(serviceContext, ksqlEngine, new StatementParser(ksqlEngine), specificQueryIdGenerator);
    }

    @VisibleForTesting
    InteractiveStatementExecutor(ServiceContext serviceContext, KsqlEngine ksqlEngine, StatementParser statementParser, SpecificQueryIdGenerator specificQueryIdGenerator) {
        this.serviceContext = (ServiceContext) Objects.requireNonNull(serviceContext, "serviceContext");
        this.ksqlEngine = (KsqlEngine) Objects.requireNonNull(ksqlEngine, "ksqlEngine");
        this.statementParser = (StatementParser) Objects.requireNonNull(statementParser, "statementParser");
        this.queryIdGenerator = (SpecificQueryIdGenerator) Objects.requireNonNull(specificQueryIdGenerator, "queryIdGenerator");
        this.statusStore = new ConcurrentHashMap();
    }

    @Override // io.confluent.ksql.rest.server.resources.KsqlConfigurable
    public void configure(KsqlConfig ksqlConfig) {
        if (!ksqlConfig.getKsqlStreamConfigProps().containsKey("application.server")) {
            throw new IllegalArgumentException("Need KS application server set");
        }
        this.ksqlConfig = ksqlConfig;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public KsqlEngine getKsqlEngine() {
        return this.ksqlEngine;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void handleStatement(QueuedCommand queuedCommand) {
        throwIfNotConfigured();
        handleStatementWithTerminatedQueries(queuedCommand.getCommand(), queuedCommand.getCommandId(), queuedCommand.getStatus(), Mode.EXECUTE, queuedCommand.getOffset().longValue());
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void handleRestore(QueuedCommand queuedCommand) {
        throwIfNotConfigured();
        handleStatementWithTerminatedQueries(queuedCommand.getCommand(), queuedCommand.getCommandId(), queuedCommand.getStatus(), Mode.RESTORE, queuedCommand.getOffset().longValue());
    }

    public Map<CommandId, CommandStatus> getStatuses() {
        return new HashMap(this.statusStore);
    }

    public Optional<CommandStatus> getStatus(CommandId commandId) {
        return Optional.ofNullable(this.statusStore.get(commandId));
    }

    private void throwIfNotConfigured() {
        if (this.ksqlConfig == null) {
            throw new IllegalStateException("No initialized");
        }
    }

    private void putStatus(CommandId commandId, Optional<CommandStatusFuture> optional, CommandStatus commandStatus) {
        this.statusStore.put(commandId, commandStatus);
        optional.ifPresent(commandStatusFuture -> {
            commandStatusFuture.setStatus(commandStatus);
        });
    }

    private void putFinalStatus(CommandId commandId, Optional<CommandStatusFuture> optional, CommandStatus commandStatus) {
        this.statusStore.put(commandId, commandStatus);
        optional.ifPresent(commandStatusFuture -> {
            commandStatusFuture.setFinalStatus(commandStatus);
        });
    }

    private void handleStatementWithTerminatedQueries(Command command, CommandId commandId, Optional<CommandStatusFuture> optional, Mode mode, long j) {
        try {
            if (command.getPlan().isPresent()) {
                executePlan(command, commandId, optional, command.getPlan().get(), mode, j);
                return;
            }
            String statement = command.getStatement();
            putStatus(commandId, optional, new CommandStatus(CommandStatus.Status.PARSING, "Parsing statement"));
            KsqlParser.PreparedStatement<?> parseSingleStatement = this.statementParser.parseSingleStatement(statement);
            putStatus(commandId, optional, new CommandStatus(CommandStatus.Status.EXECUTING, "Executing statement"));
            executeStatement(parseSingleStatement, command, commandId, optional, mode, j);
        } catch (KsqlException e) {
            log.error("Failed to handle: " + command, e);
            putStatus(commandId, optional, new CommandStatus(CommandStatus.Status.ERROR, ExceptionUtil.stackTraceToString(e)));
            throw e;
        }
    }

    private void executePlan(Command command, CommandId commandId, Optional<CommandStatusFuture> optional, KsqlPlan ksqlPlan, Mode mode, long j) {
        ConfiguredKsqlPlan of = ConfiguredKsqlPlan.of(ksqlPlan, command.getOverwriteProperties(), buildMergedConfig(command));
        putStatus(commandId, optional, new CommandStatus(CommandStatus.Status.EXECUTING, "Executing statement"));
        KsqlExecutionContext.ExecuteResult execute = this.ksqlEngine.execute(this.serviceContext, of);
        if (execute.getQuery().isPresent()) {
            this.queryIdGenerator.setNextId(j + 1);
            if (mode == Mode.EXECUTE) {
                ((QueryMetadata) execute.getQuery().get()).start();
            }
        }
        putFinalStatus(commandId, optional, new CommandStatus(CommandStatus.Status.SUCCESS, getSuccessMessage(execute)));
    }

    private String getSuccessMessage(KsqlExecutionContext.ExecuteResult executeResult) {
        return executeResult.getCommandResult().isPresent() ? (String) executeResult.getCommandResult().get() : "Created query with ID " + ((PersistentQueryMetadata) executeResult.getQuery().get()).getQueryId();
    }

    private void executeStatement(KsqlParser.PreparedStatement<?> preparedStatement, Command command, CommandId commandId, Optional<CommandStatusFuture> optional, Mode mode, long j) {
        String str;
        if (preparedStatement.getStatement() instanceof ExecutableDdlStatement) {
            str = executeDdlStatement(preparedStatement, command);
        } else if (preparedStatement.getStatement() instanceof CreateAsSelect) {
            PersistentQueryMetadata startQuery = startQuery(preparedStatement, command, mode, j);
            String name = preparedStatement.getStatement().getName().name();
            str = (preparedStatement.getStatement() instanceof CreateTableAsSelect ? "Table " + name + " created and running" : "Stream " + name + " created and running") + ". Created by query with query ID: " + startQuery.getQueryId();
        } else if (preparedStatement.getStatement() instanceof InsertInto) {
            str = "Insert Into query is running with query ID: " + startQuery(preparedStatement, command, mode, j).getQueryId();
        } else {
            if (!(preparedStatement.getStatement() instanceof TerminateQuery)) {
                throw new KsqlException(String.format("Unexpected statement type: %s", preparedStatement.getClass().getName()));
            }
            terminateQuery(preparedStatement);
            str = "Query terminated.";
        }
        putFinalStatus(commandId, optional, new CommandStatus(CommandStatus.Status.SUCCESS, str));
    }

    private String executeDdlStatement(KsqlParser.PreparedStatement<?> preparedStatement, Command command) {
        KsqlConfig buildMergedConfig = buildMergedConfig(command);
        return (String) this.ksqlEngine.execute(this.serviceContext, ConfiguredKsqlPlan.of(this.ksqlEngine.plan(this.serviceContext, ConfiguredStatement.of(preparedStatement, command.getOverwriteProperties(), buildMergedConfig)), command.getOverwriteProperties(), buildMergedConfig)).getCommandResult().get();
    }

    private PersistentQueryMetadata startQuery(KsqlParser.PreparedStatement<?> preparedStatement, Command command, Mode mode, long j) {
        KsqlConfig buildMergedConfig = buildMergedConfig(command);
        PersistentQueryMetadata persistentQueryMetadata = (QueryMetadata) this.ksqlEngine.execute(this.serviceContext, ConfiguredKsqlPlan.of(this.ksqlEngine.plan(this.serviceContext, ConfiguredStatement.of(preparedStatement, command.getOverwriteProperties(), buildMergedConfig)), command.getOverwriteProperties(), buildMergedConfig)).getQuery().orElseThrow(() -> {
            return new IllegalStateException("Statement did not return a query");
        });
        this.queryIdGenerator.setNextId(j + 1);
        if (!(persistentQueryMetadata instanceof PersistentQueryMetadata)) {
            throw new KsqlException(String.format("Unexpected query metadata type: %s; was expecting %s", persistentQueryMetadata.getClass().getCanonicalName(), PersistentQueryMetadata.class.getCanonicalName()));
        }
        PersistentQueryMetadata persistentQueryMetadata2 = persistentQueryMetadata;
        if (mode == Mode.EXECUTE) {
            persistentQueryMetadata2.start();
        }
        return persistentQueryMetadata2;
    }

    private KsqlConfig buildMergedConfig(Command command) {
        return this.ksqlConfig.overrideBreakingConfigsWithOriginalValues(command.getOriginalProperties());
    }

    private void terminateQuery(KsqlParser.PreparedStatement<TerminateQuery> preparedStatement) {
        Optional queryId = preparedStatement.getStatement().getQueryId();
        if (queryId.isPresent()) {
            this.ksqlEngine.getPersistentQuery((QueryId) queryId.get()).ifPresent((v0) -> {
                v0.close();
            });
        } else {
            this.ksqlEngine.getPersistentQueries().forEach((v0) -> {
                v0.close();
            });
        }
    }
}
